考虑以下两步算法:
Iteration 1: A1 A2 A3
Iteration 2: B2 B3
任务B{i}
依赖于任务A{i-1}
和A{i}
。
问题是如何用芹菜执行这个工作流,使:
- 没有
A
任务执行两次。 - 每个
B
任务可以在它所依赖的A
任务完成后立即启动?
我尝试了以下选项,但没有一个具有两个属性。
选项1:两组
我已经像这样把两个迭代分成两个独立的组。
result = group([A.s(1), A.s(2), A.s(3)]) | group([B.s(2), B.s(3)])
这个执行的问题是,具有A
任务的整个组需要在B
组启动之前完成。这将导致期望的结果,但不是资源的最佳利用。
选项2:和弦
result = group([
chord([A.s(1), A.s(2)], B.s(2)),
chord([A.s(2), A.s(3)], B.s(3))
])
这里的问题是A.s(2)
被调用两次。我可以在我的应用程序内部管理这个,但这需要分布式锁,以及更仔细地处理已经完成的和需要完成的。不能执行两次A.s(2)
。任务是幂等的,但是执行时间太长。
您可以在chord本身尝试类似的操作:
@shared_task(name='notify_complete')
def notify_complete(*args, extra_id):
# get list of result from a1,a2,a3 from args
# execute b1, b2
# extra_id: just in case you want to pass any var. you can ignore it.
pass
chord([A.s(1), A.s(2), A.s(3)], notify_complete.s(extra_id=extra_id))