我想为不同的任务创建多个队列。例如,emailqueue
发送电子邮件或pipedrive
队列以将任务与pipedrive API
同步,这样email
就不必等到所有pipedrives
都同步,反之亦然。
我是路由新手,我尝试了两种方法来创建队列,但似乎都没有工作。
-
这是首选方法。我试图在装饰器内部定义队列
@task
@task(bind=True, queue='pipedrivequeue')
def backsync_lead(self,lead_id):
-
settings.py
CELERY_ROUTES = { # tried CELERY_TASK_ROUTES too 'pipedrive.tasks.*': {'queue': 'pipedrivequeue'}, ... }
在这两种情况下,当我手动运行celery worker
时,我只看到一个默认celery
队列。
(project) milano@milano-PC:~/PycharmProjects/project$ celery -A project.celery worker -l info
-------------- celery@milano-PC v4.2.2 (windowlicker)
---- **** -----
--- * *** * -- Linux-4.15.0-47-generic-x86_64-with-Ubuntu-18.04-bionic 2019-04-12 17:17:05
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: project:0x7f3b47f66cf8
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. project.apps.apis.pipedrive.tasks.backsync_all_stages
. project.apps.apis.pipedrive.tasks.backsync_lead
正如你在这一行中看到的:
-------------- [queues]
.> celery exchange=celery(direct) key=celery
可能只有一个队列。我只想将此队列用于未指定队列的任务。
你知道问题出在哪里吗?
编辑
(project) milano@milano-PC:~/PycharmProjects/peoject$ celery inspect active_queues
Error: No nodes replied within time constraint.
你需要运行一个显式命名队列的worker,然后django将能够馈送到该队列中;
celery worker -A project.celery -l info # Default queue worker
celery worker -A project.celery -l info -Q pipedrivequeue # pipedrivequeue worker
celery worker -A project.celery -l info -Q testqueue # testqueue worker