我有几个这样的任务:
@celery.task
def generate():
sleep(1.0)
print "Generate done!"
return 'result'
@celery.task
def lower(result):
sleep(1.0)
print "Lower done!"
return result.lower()
@celery.task
def upper(result):
sleep(1.0)
print "Upper done!"
return result.upper()
@celery.task
def upload(result):
sleep(1.0)
print "Upload done for: %s!" % (result,)
return 'upload'
@celery.task
def callback(results):
print "It's all done! %s" % (results,)
我正在创建一个看起来像这样的和弦:
chord(
header=chain(
generate.s(),
group(
chain(lower.s(), upload.s()),
chain(upper.s(), upload.s())
)
), body=callback.s()
).delay()
我遇到的问题是我的回调应该在所有任务完成后触发,似乎在generate
后立即触发。
如果不清楚,工作流程是这样的:
- 生成一个结果,然后将其结果传递给组的成员,从而实现并行性:
- 第一组将从
generate
中获取结果,使用lower
将其转换为小写,然后使用upload
上传结果。 - 第二组将从
generate
中获取结果,使用upper
将其转换为大写,然后使用"upload"上传结果。
- 第一组将从
- 完成所有这些操作后,应调用
callback
任务回调。
预期
callback
任务将在启动后至少 3 秒调用。
实际
callback
任务在启动后大约 1 秒调用,并且不会等待组成员完成执行。
以下是证明它不会等待组的日志:
[2013-11-17 18:20:40,447: WARNING/PoolWorker-8] Generate done!
[2013-11-17 18:20:41,493: WARNING/PoolWorker-6] Upper done!
[2013-11-17 18:20:41,493: WARNING/PoolWorker-1] Lower done!
[2013-11-17 18:20:41,535: WARNING/PoolWorker-6] It's all done! [('e0016a35-d538-4e96-ad86-6ddf91ef4a09', [('b1af78a9-7935-4037-84e4-9fae6d7c027e', None), ('d69c4c99-af9c-476f-af7d-7f647c4d9c83', None)])]
[2013-11-17 18:20:42,522: WARNING/PoolWorker-7] Upload done for: result!
[2013-11-17 18:20:42,523: WARNING/PoolWorker-5] Upload done for: RESULT!
看来芹菜不等群。有没有办法让芹菜等到所有任务(包括组成员)都执行完毕?
您在此处使用链作为和弦标题,但标头必须是一个组:
chord(
header=chain(
generate.s(),
group(
chain(lower.s(), upload.s()),
chain(upper.s(), upload.s())
)
), body=callback.s()
).delay()
有了chain(generate.s(), group(...)
没有什么可以同步的,因为组并行发生。
您的工作流程可以更好地表达如下:
filters = group(lower.s() | upload.s(),
upper.s() | upload.s())
result = (generate.s() | filters | callback.s())()
注意:chain(group, sig)
会自动转换为和弦