但没有解释如果你不使用Django,如何测试Celery任务。你是怎么做到的?
可以使用任何 unittest lib 同步测试任务。在处理芹菜任务时,我通常会进行 2 个不同的测试会话。第一个(正如我建议的(是完全同步的,应该是确保算法做它应该做的事情的那个。第二个会话使用整个系统(包括代理(,并确保我没有序列化问题或任何其他分发、通信问题。
所以:
from celery import Celery
celery = Celery()
@celery.task
def add(x, y):
return x + y
而您的测试:
from nose.tools import eq_
def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)
以下是我七岁答案的更新:
您可以通过pytest fixture
在单独的线程中运行工作线程:
https://docs.celeryq.dev/en/v5.2.6/userguide/testing.html#celery-worker-embed-live-worker
根据文档,您不应该使用"always_eager"
(请参阅上述链接的页面顶部(。
旧答案:
我使用这个:
with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
...
文档:https://docs.celeryq.dev/en/3.1/configuration.html#celery-always-eager
CELERY_ALWAYS_EAGER
允许您同步运行任务,并且不需要芹菜服务器。
取决于您到底要测试什么。
- 直接测试任务代码。 不要调用"task.delay(...(",只需从单元测试中调用"task(...("。
- 使用CELERY_ALWAYS_EAGER。这将导致您的任务在您说"task.delay(...("时立即被调用,因此您可以测试整个路径(但不能测试任何异步行为(。
unittest
import unittest
from myproject.myapp import celeryapp
class TestMyCeleryWorker(unittest.TestCase):
def setUp(self):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
py.测试夹具
# conftest.py
from myproject.myapp import celeryapp
@pytest.fixture(scope='module')
def celery_app(request):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
return celeryapp
# test_tasks.py
def test_some_task(celery_app):
...
附录:让send_task尊重渴望
from celery import current_app
def send_task(name, args=(), kwargs={}, **opts):
# https://github.com/celery/celery/issues/581
task = current_app.tasks[name]
return task.apply(args, kwargs, **opts)
current_app.send_task = send_task
对于那些使用Celery 4的人来说,它是:
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
由于设置名称已更改,如果选择升级,则需要更新,请参阅
https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names
从 Celery 3.0 开始,在 Django 中设置CELERY_ALWAYS_EAGER
的一种方法是:
from django.test import TestCase, override_settings
from .foo import foo_celery_task
class MyTest(TestCase):
@override_settings(CELERY_ALWAYS_EAGER=True)
def test_foo(self):
self.assertTrue(foo_celery_task.delay())
从 Celery v4.0 开始,提供了 py.test 夹具来启动芹菜工作线程,仅用于测试,并在完成后关闭:
def test_myfunc_is_executed(celery_session_worker):
# celery_session_worker: <Worker: gen93553@mymachine.local (running)>
assert myfunc.delay().wait(3)
在 http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test 上描述的其他灯具中,您可以通过以这种方式重新定义celery_config
灯具来更改芹菜默认选项:
@pytest.fixture(scope='session')
def celery_config():
return {
'accept_content': ['json', 'pickle'],
'result_serializer': 'pickle',
}
默认情况下,测试工作线程使用内存中代理和结果后端。如果不测试特定功能,则无需使用本地 Redis 或 RabbitMQ。
参考使用 pytest。
def test_add(celery_worker):
mytask.delay()
如果使用 Flask,请设置应用配置
CELERY_BROKER_URL = 'memory://'
CELERY_RESULT_BACKEND = 'cache+memory://'
并在conftest.py
@pytest.fixture
def app():
yield app # Your actual Flask application
@pytest.fixture
def celery_app(app):
from celery.contrib.testing import tasks # need it
yield celery_app # Your actual Flask-Celery application
就我而言(我假设许多其他人(,我想要的只是使用 pytest 测试任务的内在逻辑。
博士最终嘲笑了一切(选项 2(
示例用例:
proj/tasks.py
@shared_task(bind=True)
def add_task(self, a, b):
return a+b;
tests/test_tasks.py
from proj import add_task
def test_add():
assert add_task(1, 2) == 3, '1 + 2 should equal 3'
但是,由于shared_task
装饰器做了很多芹菜内部逻辑,所以它并不是真正的单元测试。
所以,对我来说,有两个选择:
选项 1:独立的内部逻辑
proj/tasks_logic.py
def internal_add(a, b):
return a + b;
proj/tasks.py
from .tasks_logic import internal_add
@shared_task(bind=True)
def add_task(self, a, b):
return internal_add(a, b);
这看起来很奇怪,除了降低可读性之外,它还需要手动提取和传递作为请求一部分的属性,例如task_id
,以防您需要它,这使得逻辑不那么纯粹。
选项2:模拟
嘲笑芹菜内部
tests/__init__.py
# noinspection PyUnresolvedReferences
from celery import shared_task
from mock import patch
def mock_signature(**kwargs):
return {}
def mocked_shared_task(*decorator_args, **decorator_kwargs):
def mocked_shared_decorator(func):
func.signature = func.si = func.s = mock_signature
return func
return mocked_shared_decorator
patch('celery.shared_task', mocked_shared_task).start()
然后允许我模拟请求对象(同样,如果您需要请求中的内容,例如 ID 或重试计数器。
tests/test_tasks.py
from proj import add_task
class MockedRequest:
def __init__(self, id=None):
self.id = id or 1
class MockedTask:
def __init__(self, id=None):
self.request = MockedRequest(id=id)
def test_add():
mocked_task = MockedTask(id=3)
assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'
这个解决方案更加手动,但是,它为我提供了实际单元测试所需的控件,而无需重复自己,也不会丢失芹菜范围。
我在单元测试方法中看到很多CELERY_ALWAYS_EAGER = true
作为单元测试的解决方案,但是由于版本5.0.5可用,因此有很多更改,这使得大多数旧答案被弃用,对我来说是一个耗时的废话,所以对于这里搜索解决方案的每个人,请转到文档并阅读新版本的有据可查的单元测试示例:
https://docs.celeryproject.org/en/stable/userguide/testing.html
对于带有单元测试的渴望模式,这里引用了实际文档的引述:
渴望模式
task_always_eager设置启用的渴望模式由定义不适合单元测试。
使用渴望模式进行测试时,您只是在测试对什么的仿真发生在工人身上,两者之间存在许多差异仿真和现实中发生的事情。
另一种选择是模拟任务,如果你不需要运行它的副作用。
from unittest import mock
@mock.patch('module.module.task')
def test_name(self, mock_task): ...