我想在芹菜链命令中使用块。
chain = task1.s(arg1) | task2.chunks(?,CHUNK_SIZE) | task3.chunks(?, CHUNK_SIZE)
基本上,我想做的是运行task1,将其结果分块并将块发送到task2,然后task2应该调用task3,task3也应该从task2接收分块结果以完成该过程。为什么?因为任务 1 和任务 2 都可以返回相当数量的项目,我想在更多批次中处理这些项目。
上面的代码不起作用,因为我不太确定要放什么而不是问号才能使其工作。
我不太确定这是否可能,因为搜索没有提供太多结果,因此在无法构建此类工作流程的情况下,我会对合理的替代方案感兴趣。
我不确定对于现有的原语是否可能
。我可以考虑是否有两种选择/解决方法:
使用块/和弦从任务中启动新任务。
你一定已经想到了这个。这个想法是使用
apply_async
正常调用 task1 。一旦该任务完成生成需要分块的大量输出,只需使用块原语进一步为 task2 创建块。同样,对任务 2 和任务 3 之间的转换执行相同的步骤。仅当您最终等待获取内部任务的结果时,从任务内部调用任务才是一个坏主意。因此,请记住,如果您正在等待任务结果,则不建议使用此方法。@task def task1(some_input): # Do stuff # Create a list of lists where the inner list represent the *args to send to an individual task task2.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async() @task def task2(a, b): # Do stuff # Create a list of lists where the inner list represent the *args to send to an individual task task3.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async() @task def task3(a, b): # Do stuff
这个解决方案有点有趣。我在芹菜 Github 问题页面上遇到了一个特定的请求。看看这个来自史蒂夫的拉取请求:https://github.com/celery/celery/pull/817据我所知,他创建了一个动态任务装饰器(关于名称是否应该如此存在争议),它了解任务是否返回子任务。如果是这样,它将首先应用该子任务。他声称他在Veezio的生产中成功地使用了它。我自己还没有尝试过。我建议转到该线程并提出几个问题。甚至在Twitter或IRC或其他什么地方纠缠Steeve。