pytest中的Cele_worker fixture,连接已关闭错误



我有这样的pytest结构

import pytest
@pytest.mark.django_db
class TestClass:
def test_celery_mht_notification_create(self, celery_worker, user):
# some test logic

当我使用celery_worker夹具时,我得到这样的错误psycopg2.InterfaceError: connection already closed

如何解决?

根据他们在pytest-django:上创建的问题中给您的解决方案

另一方面,我的一个解决方法是作为事务测试运行@pytest.mark.django_db(transaction=True)

在第一个链接的评论线程中再挖掘一点,IMHO是一个更干净的解决方案,并解释了为什么会发生这种情况:

如果您正在使用例如Py.test并希望使用进程中的worker,则可以执行类似的操作

def pytest_configure():
from celery.fixups.django import DjangoWorkerFixup
DjangoWorkerFixup.install = lambda x: None

禁用辅助修复程序,这在上下文中是不必要的。

最新更新