考虑以下tasks.py
模块(改编自 http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#first-steps):
import logging
import sys
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
logging.info(f"Adding {x} and {y}...")
return x + y
def call_add(x, y):
add.delay(x, y)
在同一个目录中,我有一个test_tasks.py
测试模块,它读取
from unittest.mock import patch
import tasks
@patch('logging.info')
def test_adder(info_mock):
tasks.call_add(1, 2)
info_mock.assert_not_called()
这个测试通过了(如果我用pytest test_tasks.py
运行它),但我不确定为什么没有调用info_mock
?我希望以下断言能够通过
info_mock.assert_called_with("Adding 1 and 2...")
为什么在此示例中不通过tasks.call_add()
调用logging.info
?在我看来,这相当于 http://docs.celeryproject.org/en/latest/userguide/testing.html 中给出的例子。
确保在运行单元测试时直接在同一进程中运行测试。
Celery 使得在"同步"运行任务时保持相同的 API 并跳过损坏/工作器部分变得非常简单。
app = Celery('tasks', broker='pyamqp://guest@localhost//', task_always_eager=True)
http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-always-eager