使用芹菜进行shift的任务链



考虑以下两步算法:

Iteration 1: A1 A2 A3
Iteration 2:    B2 B3

任务B{i}依赖于任务A{i-1}A{i}

问题是如何用芹菜执行这个工作流,使:

  • 没有A任务执行两次。
  • 每个B任务可以在它所依赖的A任务完成后立即启动?

我尝试了以下选项,但没有一个具有两个属性。

选项1:两组

我已经像这样把两个迭代分成两个独立的组。

result = group([A.s(1), A.s(2), A.s(3)]) | group([B.s(2), B.s(3)])

这个执行的问题是,具有A任务的整个组需要在B组启动之前完成。这将导致期望的结果,但不是资源的最佳利用。

选项2:和弦

result = group([
chord([A.s(1), A.s(2)], B.s(2)),
chord([A.s(2), A.s(3)], B.s(3))
])

这里的问题是A.s(2)被调用两次。我可以在我的应用程序内部管理这个,但这需要分布式锁,以及更仔细地处理已经完成的和需要完成的。不能执行两次A.s(2)。任务是幂等的,但是执行时间太长。

您可以在chord本身尝试类似的操作:

@shared_task(name='notify_complete')
def notify_complete(*args, extra_id):
# get list of result from a1,a2,a3 from args
# execute b1, b2
# extra_id: just in case you want to pass any var. you can ignore it.
pass

chord([A.s(1), A.s(2), A.s(3)], notify_complete.s(extra_id=extra_id))

相关内容

  • 没有找到相关文章

最新更新