我在使用 current_app.send_task 时遇到 Celery 队列路由问题
我有两个工作人员(每个队列每个工作人员)
python manage.py celery worker -E -Q priority --concurrency=8 --loglevel=DEBUG
python manage.py celery worker -Q low --concurrency=8 -E -B --loglevel=DEBUG
我在文件中定义了两个队列 celeryconfig.py:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.core.exceptions import ImproperlyConfigured
from celery import Celery
from django.conf import settings
try:
app = Celery('proj', broker=getattr(settings, 'BROKER_URL', 'redis://'))
except ImproperlyConfigured:
app = Celery('proj', broker='redis://')
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_RESULT_SERIALIZER='json',
CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
CELERY_DEFAULT_EXCHANGE='tasks',
CELERY_DEFAULT_EXCHANGE_TYPE='topic',
CELERY_DEFAULT_ROUTING_KEY='task.priority',
CELERY_QUEUES=(
Queue('priority',routing_key='priority.#'),
Queue('low', routing_key='low.#'),
),
CELERY_DEFAULT_EXCHANGE='priority',
CELERY_IMPORTS=('mymodule.tasks',)
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'UTC'
if __name__ == '__main__':
app.start()
在任务的定义中,我们使用装饰器来显式队列:
@task(name='mymodule.mytask', routing_key='low.mytask', queue='low')
def mytask():
# does something
pass
当使用以下命令运行此任务时,此任务确实在低队列中运行:
from mymodule.tasks import mytask
mytask.delay()
但是当它使用以下命令运行时,情况并非如此:(它在默认队列中运行:"优先级")
from celery import current_app
current_app.send_task('mymodule.mytask')
我想知道为什么后面的方法没有将任务路由到"低"队列!
p.s:我用的是雷迪斯。
send_task是一种
低级方法。它直接向代理发送任务签名,而无需通过任务装饰器。使用此方法,您甚至可以在不加载任务代码/模块的情况下发送任务。
要解决您的问题,您可以直接从配置中获取routing_key/队列:
route = celery.amqp.routes[0].route_for_task("mymodule.mytask")
Out[10]: {'queue': 'low', 'routing_key': 'low.mytask'}
celery.send_task("myodule.mytask", queue=route['queue'], routing_key=route['routing_key']`