所以我正在从事的项目需要一个分布式任务系统来处理CPU密集型任务。这是相对简单的,旋转芹菜并将所有任务放在队列中,让芹菜完成其余的工作。
我遇到的问题是每个用户都需要自己的队列,并且每个用户队列中的项目必须同步处理。因此,用户队列中已经有一个任务正在处理,请等到它完成后再允许工作人员拾取下一个任务。
我最接近这样的事情是拥有一组固定的队列,并将它们分配给用户。然后,将芹菜工人挑选的用户任务固定到并发性为 1 的特定队列。
此系统的问题在于,我无法扩展我的工作线程来处理积压的用户任务。
有没有办法配置芹菜做我想做的事,或者可能存在另一个任务系统可以做我想做的事?
编辑:
目前,我使用以下命令在一组固定队列上生成并发为 1 的芹菜工作线程
celery multi start 4 -A app.celery -Q:1 queue_1 -Q:2 queue_2 -Q:3 queue_3 -Q:4 queue_4 --logfile=celery.log --concurrency=1
然后,我将队列名称存储在用户对象上,当用户启动进程时,我将任务排队到存储在用户对象上的队列中。这给了我同步任务。
缺点是当我有多个用户共享队列时,导致任务累积并且永远不会被处理。
我想说 5 个工作人员,每个用户对象都有一个队列。然后让工作人员跳过队列,但一次在一个队列上不要有超过 1 个工作人员。
我在这里使用chain
doc条件按特定顺序执行任务:
chain = task1_task.si(account_pk) | task2_task.si(account_pk) | task3_task.si(account_pk)
chain()
因此,我为特定用户任务 1 执行,完成后执行任务 2,完成后执行任务 3。 它将在任何可用的工人中生成:)
对于中途停止链条:
self.request.callbacks = None
return
并且不要忘记绑定您的任务:
@app.task(bind=True)
def task2_task(self, account_pk):