芹菜任务队列不与RabbitMQ一起使用



芹菜任务成功执行而没有队列

设置。

BROKER_URL = "amqp://user:pass@localhost:5672/test"
# Celery Data Format
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERYD_TASK_SOFT_TIME_LIMIT = 60
CELERY_IGNORE_RESULT = True
@app.task
def test(a,b,c):
    print("doing something here...")

命令

celery worker -A proj -E -l INFO

上面的设置工作者正在成功执行。

我已将队列引入了芹菜任务。

添加了上一个设置的配置

from kombu.entity import Exchange, Queue
CELERY_QUEUES = (
    Queue('high', Exchange('high'), routing_key='high'),
    Queue('normal', Exchange('normal'), routing_key='normal'),
    Queue('low', Exchange('low'), routing_key='low'),
)
CELERY_DEFAULT_QUEUE = 'normal'
CELERY_DEFAULT_EXCHANGE = 'normal'
CELERY_DEFAULT_ROUTING_KEY = 'normal'
CELERY_ROUTES = {
    'myapp.tasks.test': {'queue': 'high'},
}

命令

celery worker -A proj -E -l INFO -n worker.high -Q high

呼叫

 test.delay(1, 2, 3)

当我执行队列工作时,没有运行。我是否错过了任何配置?

更改芹菜仪perery_task_routes-在版本4

中更改

首先,请确保在兔子&高工作原木。

然后,尝试将CELERY_ROUTES更改为:

CELERY_ROUTES = {
    'myapp.tasks.test': {
        'exchange': 'high',
        'exchange_type': 'high',
        'routing_key': 'high'
    }
}

或使用queue调用任务,例如:

test_task = test.signature(args=(1, 2, 3), queue='high', immutable=True)
test_task.apply_async()

最新更新