我正在尝试在我的Django应用程序中集成测试并发芹菜任务。我希望该任务在pytest集成测试期间实际上并发地运行在工作人员上,但我在制作这项工作时遇到了麻烦。
假设我有以下基本的芹菜任务:
@shared_task
def sleep_task(secs):
print(f"Sleeping for {secs} seconds...")
for i in range(secs):
time.sleep(i)
print(f"t{i + 1}")
return "DONE"
在脚本中并发运行任务可以正常工作:
# script
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3
但是,我不能在单元测试中复制这种异步行为。在conftest.py
中,我设置代理和结果后端:
import pytest
pytest_plugins = ("celery.contrib.pytest",)
@pytest.fixture(scope="session")
def celery_config():
return {"broker_url": "memory://", "result_backend": "rpc://"}
这是我的单元测试。celery_session_app
和celery_session_worker
设置用于测试的芹菜应用程序和工作器(文档):
def test_concurrent_sleep_tasks(celery_session_app, celery_session_worker):
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
1
2
3
Sleeping for 3 seconds...
1
2
3
看起来任务正在同步运行。是否存在异步运行任务的方法?
celery_session_worker fixture正在启动一个并发性为1的芹菜worker。这在芹菜中是硬编码的。它目前是不可配置的。由于并发性为1,它一次只处理1个任务。
另一种解决方案是编写自己的fixture来启动并发数为2或更多的worker。