正确模拟在另一个芹菜任务中调用的芹菜任务



如何正确模拟在另一个芹菜任务中调用的芹菜任务?(下面的虚拟代码(

@app.task
def task1(smthg):
do_so_basic_stuff_1
do_so_basic_stuff_2
other_thing(smthg)
@app.task
def task2(smthg):
if condition:
task1.delay(smthg[1])
else:
task1.delay(smthg)

我在 my_module proj/cel/my_module.py 中确实有完全相同的代码结构 我正在尝试在 proj/tests/cel_test/test 中编写测试.py

测试功能:

def test_this_thing(self):
# firs I want to mock task1
# i've tried to import it from my_module.py to test.py and then mock it from test.py namespace 
# i've tried to import it from my_module.py and mock it
# nothing worked for me
# what I basically want to do 
# mock task1 here
# and then run task 2 (synchronous)
task2.apply()
# and then I want to check if task one was called 
self.assertTrue(mocked_task1.called)

你不是在调用task1()task2(),而是在调用它们的方法:delay()apply()- 所以你需要测试这些方法是否被调用。

这是我刚刚根据您的代码编写的一个工作示例:

tasks.py

from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def task1():
return 'task1'
@app.task
def task2():
task1.delay()

test.py

from tasks import task2
def test_task2(mocker):
mocked_task1 = mocker.patch('tasks.task1')
task2.apply()
assert mocked_task1.delay.called

测试结果:

$ pytest -vvv test.py
============================= test session starts ==============================
platform linux -- Python 3.5.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- /home/kris/.virtualenvs/3/bin/python3
cachedir: .cache
rootdir: /home/kris/projects/tmp, inifile:
plugins: mock-1.6.2, celery-4.1.0
collected 1 item                                                                
test.py::test_task2 PASSED
=========================== 1 passed in 0.02 seconds ===========================

首先,测试芹菜任务可能非常困难。我通常将所有逻辑放入一个不是任务的函数中,然后创建一个仅调用该函数的任务,以便您可以正确测试逻辑。

其次,我认为你不想在任务中调用任务(不确定,但我相信通常不建议这样做(。相反,根据您的需求,您可能应该链接或分组:

http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives

最后,为了回答您的实际问题,您需要在代码中准确修补delay方法,如本文中所述。

最新更新