具有多个队列的 Python 分布式任务



所以我正在从事的项目需要一个分布式任务系统来处理CPU密集型任务。这是相对简单的,旋转芹菜并将所有任务放在队列中,让芹菜完成其余的工作。

我遇到的问题是每个用户都需要自己的队列,并且每个用户队列中的项目必须同步处理。因此,用户队列中已经有一个任务正在处理,请等到它完成后再允许工作人员拾取下一个任务。

我最接近这样的事情是拥有一组固定的队列,并将它们分配给用户。然后,将芹菜工人挑选的用户任务固定到并发性为 1 的特定队列。

此系统的问题在于,我无法扩展我的工作线程来处理积压的用户任务。

有没有办法配置芹菜做我想做的事,或者可能存在另一个任务系统可以做我想做的事?

编辑:

目前,我使用以下命令在一组固定队列上生成并发为 1 的芹菜工作线程

celery multi start 4 -A app.celery -Q:1 queue_1 -Q:2 queue_2 -Q:3 queue_3 -Q:4 queue_4 --logfile=celery.log --concurrency=1

然后,我将队列名称存储在用户对象上,当用户启动进程时,我将任务排队到存储在用户对象上的队列中。这给了我同步任务。

缺点是当我有多个用户共享队列时,导致任务累积并且永远不会被处理。

我想说 5 个工作人员,每个用户对象都有一个队列。然后让工作人员跳过队列,但一次在一个队列上不要有超过 1 个工作人员。

我在这里使用chaindoc条件按特定顺序执行任务:

chain = task1_task.si(account_pk) | task2_task.si(account_pk) | task3_task.si(account_pk)
chain()

因此,我为特定用户任务 1 执行,完成后执行任务 2,完成后执行任务 3。 它将在任何可用的工人中生成:)

对于中途停止链条:

self.request.callbacks = None
return

并且不要忘记绑定您的任务:

@app.task(bind=True)
def task2_task(self, account_pk):

相关内容

  • 没有找到相关文章

最新更新