使用Tornado Asynchttptestcase连接超时进行测试



我在单元测试龙卷风应用程序时有问题,请帮助我。错误堆栈跟踪:

错误追溯(最近的呼叫上次):文件 "/users/doc/python/lib/python3.5/site-packages/tornado/testing.py", 第432行,拆除 超时= get_async_test_timeout())文件"/users/doc/python-virt1/lib/python3.5/site-packages/tornado/ioloop.py", 第456行,在run_sync中 提高timeouterror("在%s秒后的操作" tornado.ioloop.timeouterror:操作时间超时 秒

错误:龙卷风:未检索未来的异常:追溯(最新的 last):文件 "/users/doc/python/lib/python3.5/site-packages/tornado/gen.py", 第1021行,跑步 屈服= self.gen.throw(*exc_info)文件"/library/frameworks/python.framework/versions/3.5/lib/python3.5/types.py", 第179行,投掷 返回self .__包装(tp, *rest)文件"/users/doc/python-virt1/lib/python3.5/site-packages/tornado/gen.py", 第1015行,运行 value = future.result()文件"/users/doc/python-virt1/lib/python3.5/site-packages/tornado/concurrent.py", 第237行,结果 rish_exc_info(self._exc_info)文件",第3行,在rise_exc_info tornado.curl_httpclient.curlerror.curl_httpclient.curlerror:http 599:空 从服务器追溯回复(最近的最新通话):

test.py文件:

from tornado.testing import gen_test
from tests.api_tests.base import AbstractApplicationTestBase

class ApiRestTest(AbstractApplicationTestBase):
    def setUp(self):
        super(ApiRestTest, self).setUp()
        self.prepareDatabase(self.config)
        self.insert_user(config=self.config)

api_test/base.py

import logging
from api import server
from commons.constants import config
from tests.base import BaseTestClass

class AbstractApplicationTestBase(BaseTestClass):
    def get_app(self):
        application = server.get_application(self.config)
        application.settings[config.APPLICATION_DB] = self.db
        application.settings[config.APPLICATION_CONFIG] = self.config
        application.settings[config.APPLICATION_AES] = self.aes
        application.settings[config.APPLICATION_FS] = self.fs
        logging.info(self.config.DEPLOY_API)
        return application

test/base.py

import logging
from datetime import datetime
import motor.motor_tornado
from motor import MotorGridFSBucket
from pymongo import MongoClient
from tornado import escape
from tornado import gen
from tornado.testing import AsyncHTTPTestCase
class BaseTestClass(AsyncHTTPTestCase):
     @classmethod
     def setUpClass(self):
         super(BaseTestClass, self).setUpClass()
         self.config = Config(Environment.TESTS.value)
         self.client = utils.http_client(self.config.PROXY_HOST, self.config.PROXY_PORT)
         self.db = motor.motor_tornado.MotorClient(self.config.MONGODB_URI)[self.config.MONGODB_NAME]
         self.fs = MotorGridFSBucket(self.db)

asynchttptestcase在每个测试的开头创建一个新的iOloop,并在每个测试的结束时将其销毁。但是,您正在整个测试类的开头创建一个电动机,并使用默认的全局IOLOOP而不是专门为每个测试创建的IOLOOP。

我相信您只需要用设置替换setupclass。然后,在Asynchttptestcase设置其IOLOOP之后,您将创建电动机。为了清楚起见,明确通过Ioloop:

client = MotorClient(io_loop=self.io_loop)
self.db = client[self.config.MONGODB_NAME]

我注意到的几件事。

  • 我看到的主要问题是,您在设置方法中通过电机在那里进行一些IO,并且设置不能是gen_test(afaik)。如果您需要此类型的功能,则可能需要落到Pymongo并同步致电数据库以使这些数据库固定装置固定。
  • 您是否有意反对真实数据库?这些应该是与MongoDB的真实集成测试吗?当我编写这些类型的测试时,我通常会使用模拟类并嘲笑我与MongoDB的所有互动。
  • 另外,创建AsynchttpClient对象没有成本,因此将每个处理程序从您的设置/配置对象传递到每个处理程序可能不是最佳练习。

这是我的项目中的示例处理程序测试:

example_fixture = [{'foo': 'bar'}]
URL = r'/list'
    class BaseListHandlerTests(BaseHandlerTestCase):
        """
        Test the abstract list handler
        """
        def setUp(self):
            self.mongo_client = Mock()
            self.fixture = deepcopy(example_fixture)
            # Must be run last
            BaseHandlerTestCase.setUp(self)
        def get_app(self):
            return Application([
                (URL, BaseListHandler,
                 dict(mongo_client=self.mongo_client))
            ], **settings)
        def test_get_list_of_objects_returns_200_with_results(self):
            self.mongo_client.find.return_value = self.get_future(example_fixture)
            response = self.fetch('{}'.format(URL))
            response_json = self.to_json(response)
            self.assertListEqual(response_json.get('results'), example_fixture)
            self.assertEqual(response.code, 200)
        def test_get_list_of_objects_returns_200_with_no_results(self):
            self.mongo_client.find.return_value = self.get_future([])
            response = self.fetch('{}'.format(URL))
            self.assertEqual(response.code, 200)
        def test_get_list_of_objects_returns_500_with_exception(self):
            self.mongo_client.find.return_value = self.get_future_with_exception(Exception('FAILED!'))
            response = self.fetch('{}'.format(URL))
            self.assertEqual(response.code, 500)

进行这项工作的关键是我的mongo_client被传递到路由对象本身。因此,我的处理程序初始化采用mongo_client kwarg。

class BaseListHandler(BaseHandler):
    """
    Base list handler
    """
    mongo_client = None
    def initialize(self, mongo_client=None):
        """
        Rest Client Initialize
        Args:
            mongo_client: The client used to access documents for this handler
        Returns:
        """
        BaseHandler.initialize(self)
        self.mongo_client = mongo_client

最新更新