我有这样的pytest结构
import pytest
@pytest.mark.django_db
class TestClass:
def test_celery_mht_notification_create(self, celery_worker, user):
# some test logic
当我使用celery_worker
夹具时,我得到这样的错误psycopg2.InterfaceError: connection already closed
如何解决?
根据他们在pytest-django
:上创建的问题中给您的解决方案
另一方面,我的一个解决方法是作为事务测试运行
@pytest.mark.django_db(transaction=True)
在第一个链接的评论线程中再挖掘一点,IMHO是一个更干净的解决方案,并解释了为什么会发生这种情况:
如果您正在使用例如Py.test并希望使用进程中的worker,则可以执行类似的操作
def pytest_configure():
from celery.fixups.django import DjangoWorkerFixup
DjangoWorkerFixup.install = lambda x: None
禁用辅助修复程序,这在上下文中是不必要的。