我是Celery的新手,我正在努力了解它是否能解决我的问题。
我需要启动一些任务(An
),然后在这些任务完成后运行另一个任务(B
)。问题是任务An
是按顺序添加的,我不想在开始第一个任务之前等待最后一个任务的添加。我可以将任务B
配置为在任务An
完成后执行吗?
现在进入真实场景:
- 任务
An
-处理用户上传的文件(在每个文件上传) - 任务
B
-使用处理所有已上载的文件
也欢迎替代解决方案
为了确保您可以做到这一点,celenicanvas支持许多选项,包括您需要的行为、在一组任务之后运行任务。。。它被称为"和弦",例如:
from celery import chord
from tasks import task_upload1, task_upload2, task_upload3, final_execution
result = chord(task_upload1.s(), task_upload2.s(), task_upload3.s())(final_execution.s())
get_required_result = result.get()
您可以参考此链接了解更多详细信息
使用RabbitMQ,您可以使用消息确认和聚合器模式获得准确的行为。
您启动worker,它消耗消息(A
)并做一些工作(在您的情况下,处理用户上传的文件),但完成后不发送ack
。相反,它从队列中获取下一条消息,若再次是A
任务,他也在做同样的事情。在某个时刻,他将接收任务B
,并可以处理之前A
的所有结果,所有结果,并将ack
发送给所有结果。
不幸的是,这种情况无法用Celery完成,因为您必须在创建时间时指定所有A
任务和最终B
任务(链、和弦、回调等)。
或者,您可以将每个成功的A
任务的Task.id
保存在单独的队列中(而不是Celery队列),并在执行B
任务时处理此消息。芹菜可以适应这种算法。