celery_tasktree (https://pypi.python.org/pypi/celery-tasktree( 与 Celery 工作流调度程序 (http://docs.celeryproject.org/en/latest/userguide/canvas.html( 相比,提供了更干净的工作流画布。但是,它仅支持树状工作流结构,不支持常规的类似 DAG 的工作流。芹菜工作流程确实有"和弦"方法,但使用起来似乎很麻烦。
是否有任何其他类似于 celery_tasktree 的基于芹菜的库适用于常规 DAG 工作流?
下面是几个支持基于 DAG 的作业计划程序的库。
https://github.com/thieman/dagobah
https://github.com/apache/incubator-airflow
它们不是基于芹菜。但是,您可以在 celery 中创建自己的基元来转发可用于生成 DAG 作业计划程序的结果。
@app.task(bind=True)
def forward(self, result, sig):
# convert JSON serialized signature back to Signature
sig = self.app.signature(sig)
# get the return value of the provided task signature
result2 = sig()
# next task will receive tuple of (result_A, result_B)
return (result, result2)
@app.task
def C(self, Ares_Bres)):
Ares, Bres = Ares__Bres
return Ares + Bres
workflow = (A.s() | forward.s(B.s()) | C.s())
有关更详细的讨论,请参阅此处。