如何使用Celery、RabbitMQ和Django确保每个用户的任务执行顺序



我正在运行Django、Celery和RabbitMQ。我试图实现的是确保与一个用户相关的任务按顺序执行(特别是,一次一个,我不希望每个用户的任务并发)

  • 无论何时为用户添加新任务,都应该取决于最近添加的任务。如果此类型的任务已为此用户排队并且尚未启动,则附加功能可能包括不将任务添加到队列中

我做了一些研究,并且:

  • 我在Celery中找不到将新创建的任务与已经排队的任务链接起来的方法,链似乎只能链接新任务
  • 我认为这两种功能都可以通过自定义RabbitMQ消息处理程序来实现,尽管这可能很难编码
  • 我也读过关于celener任务树的文章,这可能是确保执行顺序的最简单方法,但我如何将新任务与已经"applied_async"的任务树或队列链接起来?有什么方法可以让我使用这个软件包实现额外的无重复功能吗

编辑:芹菜食谱中也有这个"锁"的例子,因为这个概念很好,我看不到在我的情况下让它按预期工作的可能方法——简单地说,如果我不能为用户获取锁,就必须重试任务,但这意味着要把它推到队列的末尾。

这里最好的做法是什么?

如果您配置了celener工作程序,使它们一次只能执行一个任务(请参阅worker_concurrency设置),那么您可以在每个用户的基础上强制执行所需的并发性。使用类似的方法

NUMBER_OF_CELERY_WORKERS = 10
def get_task_queue_for_user(user):
return "user_queue_{}".format(user.id % NUMBER_OF_CELERY_WORKERS)

为了根据用户id获得任务队列,每个任务都将分配给每个用户的同一队列。需要将工作人员配置为仅使用单个任务队列中的任务。

结果是这样的:

  1. 用户49触发任务

  2. 任务发送到user_queue_9

  3. 当正在侦听user_queue_9的一个且唯一的芹菜工作者准备使用新任务时,该任务被执行

这是一个棘手的答案,因为

  • 每个队列只需要一个芹菜工作者是一个脆弱的系统——如果芹菜工作者停止,整个队列就会停止

  • 工人们正在低效地运行

相关内容

  • 没有找到相关文章

最新更新