我正在运行Django、Celery和RabbitMQ。我试图实现的是确保与一个用户相关的任务按顺序执行(特别是,一次一个,我不希望每个用户的任务并发)
- 无论何时为用户添加新任务,都应该取决于最近添加的任务。如果此类型的任务已为此用户排队并且尚未启动,则附加功能可能包括不将任务添加到队列中
我做了一些研究,并且:
- 我在Celery中找不到将新创建的任务与已经排队的任务链接起来的方法,链似乎只能链接新任务
- 我认为这两种功能都可以通过自定义RabbitMQ消息处理程序来实现,尽管这可能很难编码
- 我也读过关于celener任务树的文章,这可能是确保执行顺序的最简单方法,但我如何将新任务与已经"
applied_async
"的任务树或队列链接起来?有什么方法可以让我使用这个软件包实现额外的无重复功能吗
编辑:芹菜食谱中也有这个"锁"的例子,因为这个概念很好,我看不到在我的情况下让它按预期工作的可能方法——简单地说,如果我不能为用户获取锁,就必须重试任务,但这意味着要把它推到队列的末尾。
这里最好的做法是什么?
如果您配置了celener工作程序,使它们一次只能执行一个任务(请参阅worker_concurrency设置),那么您可以在每个用户的基础上强制执行所需的并发性。使用类似的方法
NUMBER_OF_CELERY_WORKERS = 10
def get_task_queue_for_user(user):
return "user_queue_{}".format(user.id % NUMBER_OF_CELERY_WORKERS)
为了根据用户id获得任务队列,每个任务都将分配给每个用户的同一队列。需要将工作人员配置为仅使用单个任务队列中的任务。
结果是这样的:
用户49触发任务
任务发送到
user_queue_9
当正在侦听
user_queue_9
的一个且唯一的芹菜工作者准备使用新任务时,该任务被执行
这是一个棘手的答案,因为
每个队列只需要一个芹菜工作者是一个脆弱的系统——如果芹菜工作者停止,整个队列就会停止
工人们正在低效地运行