启动芹菜工作线程并为广播队列启用它



我正在尝试启动芹菜工人,因此它只侦听单个队列。这不是问题,我可以这样做:

python -m celery worker -A my_module -Q my_queue -c 1

但是现在我也希望这个my_queue队列是一个广播队列,所以我在我的 celeryconfig 中这样做:

from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)

但是一旦我这样做,我就无法再启动我的工人了,我从 rabbitmq 收到错误:

amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'

如果我在没有-Q的情况下启动 worker (但如上所述将Broadcast留在celeryconfig.py中)并且我列出了 rabbitmq 队列,我可以看到广播队列的创建和命名如下所示:

bcast.43fecba7-786a-461c-a322-620039b29b8b

同样,如果我在 worker 中定义此队列(使用如上所述的-Q)或celeryconfig.py中的简单Queue,如下所示:

from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)

我可以在 rabbitmq 中看到这个队列,如下所示:

my_queue

在定义队列时,我在调用中放入Broadcast内容并不重要 - 这似乎是内部芹菜名称,而不是传递给 rabbitmq。

所以我猜测当工人启动时,my_queue被创建,一旦完成就不能Broadcast.

我可以有一个监听任何队列(不仅是my_queue)的工作线程,我将从删除-Q参数开始。但是,如果能够有一个只侦听该特定队列的进程,那就太好了,因为我在那里投入的任务速度很快,而且我想尽可能降低延迟。

--- 编辑 1 --- 花了一些时间解决这个问题bcast似乎上面提到的队列没有一致出现。重置 rabbitmq 并在没有选项的情况下运行芹菜bcast队列没有出现-Q......

使用代理发送消息时,客户端和工作线程必须就相同的配置值达成一致。如果必须更改配置,则需要清除现有消息并重新启动所有内容,以便它们同步。

启动广播队列时,可以设置交换类型并配置队列。

from kombu.common import Broadcast
from kombu import Exchange

exchange = Exchange('custom_exchange', type='fanout')
CELERY_QUEUES = (
Broadcast(name='bcast', exchange=exchange),
)

现在,您可以使用

celery worker -l info -A tasks -Q bcast 

相关内容

  • 没有找到相关文章

最新更新