我有一个python大数据项目,我试图在其中使用芹菜和Redis服务器来完成任务。问题是,我需要三个不同的队列来完成三个不同任务,我正在将这些任务应用到芹菜上
这是我为同时运行这三个任务所做的配置,但它们使用一个单独的队列一个接一个地执行任务,因此需要花费大量时间。
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery("tasks", broker="redis://localhost:6379")
@app.task()
def main(capital,gamma,alpha):
@app.task()
def gain(capital,gamma,alpha):
@app.task()
def lain(capital,gamma,alpha):
为了启动芹菜应用程序,我使用了以下几行代码
("celery -A task worker --loglevel=info -P eventlet --concurrency=10 -n worker1@%h" , shell=True)
("celery -A task worker --loglevel=info -P eventlet --concurrency=10 -n worker2@%h" , shell=True)
("celery -A task worker --loglevel=info -P eventlet --concurrency=10 -n worker3@%h" , shell=True)
这些代码完美地用三个任务运行我的应用程序,但我需要为每个任务创建三个独立的队列,以便所有三个任务都可以同时或并行运行。
这就是我的芹菜应用程序的三个任务的样子,但它只使用了一个队列,即名为芹菜的默认队列。
-------------- worker1@EC2AMAZ-RTM8UD8 v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.20348-SP0 2022-03-10 12:59:07
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x248adf81e80
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 10 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. task.gain
. task.lain
. task.main
[2022-03-10 12:59:07,461: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-03-10 12:59:07,477: INFO/MainProcess] mingle: searching for neighbors
[2022-03-10 12:59:08,493: INFO/MainProcess] mingle: all alone
[2022-03-10 12:59:08,493: INFO/MainProcess] pidbox: Connected to redis://localhost:6379//.
[2022-03-10 12:59:08,493: INFO/MainProcess] worker1@EC2AMAZ-RTM8UD8 ready.
因此,请有人帮我在芹菜应用程序中为三个不同的任务定义三个不同队列,如果有任何帮助,我们将不胜感激:(
它应该很简单,只需将-Q <queue name>
添加到您运行的三个Celery工作程序中即可。