如何在芹菜中为多个任务分配多个队列



我有一个python大数据项目,我试图在其中使用芹菜和Redis服务器来完成任务。问题是,我需要三个不同的队列来完成三个不同任务,我正在将这些任务应用到芹菜上

这是我为同时运行这三个任务所做的配置,但它们使用一个单独的队列一个接一个地执行任务,因此需要花费大量时间。

from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery("tasks", broker="redis://localhost:6379")
@app.task()
def main(capital,gamma,alpha):
@app.task()
def gain(capital,gamma,alpha):
@app.task()
def lain(capital,gamma,alpha):

为了启动芹菜应用程序,我使用了以下几行代码

("celery -A task worker --loglevel=info -P eventlet --concurrency=10 -n worker1@%h" , shell=True)
("celery -A task worker --loglevel=info -P eventlet --concurrency=10 -n worker2@%h" , shell=True)
("celery -A task worker --loglevel=info -P eventlet --concurrency=10 -n worker3@%h" , shell=True)

这些代码完美地用三个任务运行我的应用程序,但我需要为每个任务创建三个独立的队列,以便所有三个任务都可以同时或并行运行。

这就是我的芹菜应用程序的三个任务的样子,但它只使用了一个队列,即名为芹菜的默认队列。

-------------- worker1@EC2AMAZ-RTM8UD8 v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.20348-SP0 2022-03-10 12:59:07
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x248adf81e80
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 10 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery           exchange=celery(direct) key=celery

[tasks]
. task.gain
. task.lain
. task.main
[2022-03-10 12:59:07,461: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-03-10 12:59:07,477: INFO/MainProcess] mingle: searching for neighbors
[2022-03-10 12:59:08,493: INFO/MainProcess] mingle: all alone
[2022-03-10 12:59:08,493: INFO/MainProcess] pidbox: Connected to redis://localhost:6379//.
[2022-03-10 12:59:08,493: INFO/MainProcess] worker1@EC2AMAZ-RTM8UD8 ready.

因此,请有人帮我在芹菜应用程序中为三个不同的任务定义三个不同队列,如果有任何帮助,我们将不胜感激:(

它应该很简单,只需将-Q <queue name>添加到您运行的三个Celery工作程序中即可。

最新更新