和弦不等待所有子任务

  • 本文关键字:子任务 等待 celery
  • 更新时间 :
  • 英文 :


我有几个这样的任务:

@celery.task
def generate():
    sleep(1.0)
    print "Generate done!"
    return 'result'
@celery.task
def lower(result):
    sleep(1.0)
    print "Lower done!"
    return result.lower()
@celery.task
def upper(result):
    sleep(1.0)
    print "Upper done!"
    return result.upper()
@celery.task
def upload(result):
    sleep(1.0)
    print "Upload done for: %s!" % (result,)
    return 'upload'
@celery.task
def callback(results):
    print "It's all done! %s" % (results,)

我正在创建一个看起来像这样的和弦:

chord(
    header=chain(
        generate.s(),
        group(
            chain(lower.s(), upload.s()),
            chain(upper.s(), upload.s())
        )
    ), body=callback.s()
).delay()

我遇到的问题是我的回调应该在所有任务完成后触发,似乎在generate后立即触发。

如果不清楚,工作流程是这样的:

  1. 生成一个结果,然后将其结果传递给组的成员,从而实现并行性:
    1. 第一组将从generate中获取结果,使用 lower 将其转换为小写,然后使用 upload 上传结果。
    2. 第二组将从generate中获取结果,使用upper将其转换为大写,然后使用"upload"上传结果。
  2. 完成所有这些操作后,应调用callback任务回调。

预期

callback任务将在启动后至少 3 秒调用。

实际

callback任务在启动后大约 1 秒调用,并且不会等待组成员完成执行。

以下是证明它不会等待组的日志:

[2013-11-17 18:20:40,447: WARNING/PoolWorker-8] Generate done!
[2013-11-17 18:20:41,493: WARNING/PoolWorker-6] Upper done!
[2013-11-17 18:20:41,493: WARNING/PoolWorker-1] Lower done!
[2013-11-17 18:20:41,535: WARNING/PoolWorker-6] It's all done! [('e0016a35-d538-4e96-ad86-6ddf91ef4a09', [('b1af78a9-7935-4037-84e4-9fae6d7c027e', None), ('d69c4c99-af9c-476f-af7d-7f647c4d9c83', None)])]
[2013-11-17 18:20:42,522: WARNING/PoolWorker-7] Upload done for: result!
[2013-11-17 18:20:42,523: WARNING/PoolWorker-5] Upload done for: RESULT!

看来芹菜不等群。有没有办法让芹菜等到所有任务(包括组成员)都执行完毕?

您在此处使用链作为和弦标题,但标头必须是一个组:

chord(
    header=chain(
        generate.s(),
        group(
            chain(lower.s(), upload.s()),
            chain(upper.s(), upload.s())
        )
    ), body=callback.s()
).delay()

有了chain(generate.s(), group(...)没有什么可以同步的,因为组并行发生。

您的工作流程可以更好地表达如下:

filters = group(lower.s() | upload.s(),
                upper.s() | upload.s())
result = (generate.s() | filters | callback.s())()

注意:chain(group, sig)会自动转换为和弦

相关内容

  • 没有找到相关文章

最新更新