当烧瓶服务器运行另一个线程时,Pytest 挂起



我正在使用Python3,Flask 0.12和Pytest 3.0.7。

我有一个类似于这样的烧瓶应用程序:

class AppInitializer:
    def __init__(self):
        pass
    @staticmethod
    def __function_to_be_refreshed():
        while True:
            try:
                time.sleep(5)
            except Exception as e:
                logger.exception(e)

    def create_app(self):
        daemon_thread = threading.Thread(target=self.__function_to_be_refreshed)
        daemon_thread.daemon = True
        daemon_thread.start()
        atexit.register(daemon_thread.join)
        app_ = Flask(__name__)
        return app_

app_initializer = AppInitializer()
app = app_initializer.create_app()

我正在尝试使用 pytest 测试此应用程序,如下所示:

import unittest
import pytest

class TestCheckPriceRequestAPI(unittest.TestCase):
    def setUp(self):
        self.app = api.app.test_client()
    def test_random(self):
        pass

当我使用 pytest 运行此测试时,此测试(以及所有其他测试(成功运行,但 pytest 挂起。如何停止正在运行的pytest进程(或者可能杀死守护进程线程(?

join 命令仅表示线程将等到线程完成,但不会完成它。要以无限循环结束线程,您可以这样做:

class AppInitializer:
    def __init__(self):
        self._run = True
        self._daemon_thread = None
    def __function_to_be_refreshed(self):
        while self._run:
            try:
                time.sleep(5)
            except Exception as e:
                logger.exception(e)
    def __shutdown(self):
        self._run = False
        if self._daemon_thread:
            self._daemon_thread.join()
    def create_app(self):
        self._daemon_thread = threading.Thread(target=self.__function_to_be_refreshed)
        ...
        atexit.register(self.__shutdown)
        ...

相关内容

  • 没有找到相关文章

最新更新