我有一个工作链,但当我添加apply_async()
时,它只执行第一个任务。
@task(name='run_a', delay=True)
def run_a(**kwargs):
do_whatever(kwarg['var'])
return
@task(name='run_b', delay=True)
def run_b(**kwargs):
# ...
return
@task(name='run_c', delay=True)
def run_c(**kwargs):
# ...
return
使用链式命令:
ret = chain(
run_a.s(**kwargs),
run_b.s(**kwargs),
run_b.s(**kwargs)
).apply_async()
- 如果没有
apply_async
,一切都会按预期(同步)工作 - "夸"是一句格言
基于文档http://docs.celeryproject.org/en/master/userguide/canvas.html#chains:将应用链接任务,并将其父任务的结果作为第一个参数。因此,为了强制下一个链接的任务不使用父结果作为参数,我们必须使用.si()快捷方式使任务不可变。因此,我们必须按照重新编写链
In [29]: ret = chain(
...: run_a.si(**kwargs),
...: run_b.si(**kwargs),
...: run_c.si(**kwargs)
...: ).apply_async()
结果
In [30]: print ret.parent.parent.graph
0e1541f8-93c2-48c9-95b0-7a0a5971d74a(1)
7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
70a6e66c-1ef9-4814-ae23-9c905ee1fcd5(2)
0e1541f8-93c2-48c9-95b0-7a0a5971d74a(1)
7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
当Celery在执行前验证任务时,函数需要*args
和**kwargs
才能工作。
# Kwargs was filled, I added an empty args list
args = []
kwargs = {
'some': 'intelligent data',
}
当同时调用这两个函数时,它会按预期工作:
ret = chain(
run_a.s(*args, **kwargs),
run_b.s(*args, **kwargs),
run_b.s(*args, **kwargs)
).apply_async()