为什么Celery只运行链中的第一个任务



我有一个工作链,但当我添加apply_async()时,它只执行第一个任务。

@task(name='run_a', delay=True)
def run_a(**kwargs):
    do_whatever(kwarg['var'])
    return
@task(name='run_b', delay=True)
def run_b(**kwargs):
    # ...
    return
@task(name='run_c', delay=True)
def run_c(**kwargs):
    # ...
    return

使用链式命令:

ret = chain(
    run_a.s(**kwargs),
    run_b.s(**kwargs),
    run_b.s(**kwargs)
).apply_async()
  • 如果没有apply_async,一切都会按预期(同步)工作
  • "夸"是一句格言

基于文档http://docs.celeryproject.org/en/master/userguide/canvas.html#chains:将应用链接任务,并将其父任务的结果作为第一个参数。因此,为了强制下一个链接的任务不使用父结果作为参数,我们必须使用.si()快捷方式使任务不可变。因此,我们必须按照重新编写链

In [29]: ret = chain(
    ...:     run_a.si(**kwargs),
    ...:     run_b.si(**kwargs),
    ...:     run_c.si(**kwargs)
    ...: ).apply_async()

结果

In [30]: print ret.parent.parent.graph
0e1541f8-93c2-48c9-95b0-7a0a5971d74a(1)
     7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
70a6e66c-1ef9-4814-ae23-9c905ee1fcd5(2)
     0e1541f8-93c2-48c9-95b0-7a0a5971d74a(1)
          7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)

当Celery在执行前验证任务时,函数需要*args**kwargs才能工作。

# Kwargs was filled, I added an empty args list
args = []
kwargs = {
    'some': 'intelligent data',
    }

当同时调用这两个函数时,它会按预期工作:

ret = chain(
    run_a.s(*args, **kwargs),
    run_b.s(*args, **kwargs),
    run_b.s(*args, **kwargs)
).apply_async()

相关内容

  • 没有找到相关文章

最新更新