我刚开始在Django项目中使用芹菜,有点陷入了这个特殊的问题:基本上,我需要将一个长期运行的任务分配给不同的工作人员。这项任务实际上分为几个步骤,每个步骤都需要相当长的时间才能完成。因此,如果某个步骤失败,我希望芹菜使用同一个worker重试此任务,以重用已完成步骤的结果。我知道芹菜使用路由将任务分配到特定的服务器,但我找不到任何关于这个特定问题的信息。我使用RabbitMQ作为我的经纪人。
您可以让每个celeryd实例从以工作进程的主机名命名的队列中消费:
celeryd -l info -n worker1.example.com -Q celery,worker1.example.com
将主机名设置为worker1.example.com
,并将从名称相同的队列以及默认队列(名称为celery
)中使用。
然后,要将任务定向给特定的工作人员,您可以使用:
task.apply_async(args, kwargs, queue="worker1.example.com")
类似于引导重试:
task.retry(queue="worker1.example.com")
或者将重试定向到同一个工作者:
task.retry(queue=task.request.hostname)