我正在尝试启动芹菜工人,因此它只侦听单个队列。这不是问题,我可以这样做:
python -m celery worker -A my_module -Q my_queue -c 1
但是现在我也希望这个my_queue
队列是一个广播队列,所以我在我的 celeryconfig 中这样做:
from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)
但是一旦我这样做,我就无法再启动我的工人了,我从 rabbitmq 收到错误:
amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'
如果我在没有-Q
的情况下启动 worker (但如上所述将Broadcast
留在celeryconfig.py
中)并且我列出了 rabbitmq 队列,我可以看到广播队列的创建和命名如下所示:
bcast.43fecba7-786a-461c-a322-620039b29b8b
同样,如果我在 worker 中定义此队列(使用如上所述的-Q
)或celeryconfig.py
中的简单Queue
,如下所示:
from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)
我可以在 rabbitmq 中看到这个队列,如下所示:
my_queue
在定义队列时,我在调用中放入Broadcast
内容并不重要 - 这似乎是内部芹菜名称,而不是传递给 rabbitmq。
所以我猜测当工人启动时,my_queue
被创建,一旦完成就不能Broadcast
.
我可以有一个监听任何队列(不仅是my_queue
)的工作线程,我将从删除-Q
参数开始。但是,如果能够有一个只侦听该特定队列的进程,那就太好了,因为我在那里投入的任务速度很快,而且我想尽可能降低延迟。
--- 编辑 1 --- 花了一些时间解决这个问题bcast
似乎上面提到的队列没有一致出现。重置 rabbitmq 并在没有选项的情况下运行芹菜bcast
队列没有出现-Q
......
使用代理发送消息时,客户端和工作线程必须就相同的配置值达成一致。如果必须更改配置,则需要清除现有消息并重新启动所有内容,以便它们同步。
启动广播队列时,可以设置交换类型并配置队列。
from kombu.common import Broadcast
from kombu import Exchange
exchange = Exchange('custom_exchange', type='fanout')
CELERY_QUEUES = (
Broadcast(name='bcast', exchange=exchange),
)
现在,您可以使用
celery worker -l info -A tasks -Q bcast