可以问你一个关于芹菜的问题吗?
我有不同的作家,每X分钟写一个任务。每个任务都需要完成来自同一个编写器的任务。系统运行良好,如X分钟>>几秒钟即可完成任务。
但是,现在,可能会发生这样的情况:作者同时发送两到三个任务。显然,Celery+RabbitMQ会将这些任务分配给不同的工作人员,从而制造麻烦。
我已经搜索过了,但我发现了关于用锁阻止一个工作者直到另一个工作者完成的响应(例如使用Redis),但这是不可能的,因为我的工作者比作者少。
我需要类似N个队列的东西来容纳N个写入程序,并且Celery能够理解每个队列中的顺序。我会有成千上万的作家,所以我不能创造这么多工人。
示例:A B C编写器,A1,A2…任务,并且只有一个工人
我在"同一"时间收到A1、A2、B1、C1、B2、C2、A3、B3、C3
Celery应该创建队列A(1-2-3)B(1-2-3)C(1-2-3)
然后,发送任务A1,接下来,如果是A2、B1、C1并不重要,但不应该是A3、B2、B3、C2、C3。
希望我解释好
谢谢!
我认为您需要为每个队列创建一个工作者来强制执行这样的排序。否则,工作人员将只使用先进先出的方法来处理任务。您可以创建任意数量的队列,并配置每个工作人员接收消息的队列。您可以在启动worker时传递-Q
参数以设置其队列,如Workers Guide中所述。
celery -A my_project worker -l info -Q A
然后,您可以使用"路由指南"设置全局映射,定义每个任务要转到的队列。
CELERY_ROUTES = {
'my_app.tasks.task_a1': {'queue': 'A'},
'my_app.tasks.task_a2': {'queue': 'A'},
'my_app.tasks.task_b1': {'queue': 'B'},
'my_app.tasks.task_c1': {'queue': 'C'},
}
或者,您可以在提交每个任务实例时根据《调用任务指南》指定队列。
task_a1.apply_async(queue='A')