我有
- 设置
CELERY_CREATE_MISSING_QUEUES = True
- 未定义
CELERY_QUEUES
- 定义的
CELERY_DEFAULT_QUEUE = 'default'
(直接类型) - 一个自定义路由器类,用于创建动态路由,如中所示这张票(https://github.com/celery/celery/issues/150)
我看到自定义路由器返回的路由中的新队列被创建,我认为这是因为CELERY_CREATE_MISSING_QUEUES
。
现在,在我运行的工作节点中,我不传递-Q
参数,它只从"默认"队列中消耗,该队列似乎与文档-一致
默认情况下,它将从CELERY_QUEUES设置(如果未指定,则默认为队列命名为芹菜)。
有没有任何方法可以让我的工作节点从所有队列中消费,包括动态创建的队列?
谢谢,
需要告诉工作人员这些自动或动态创建的队列,因此您需要一种方法来获取这些队列名称并存储它们——可能是在创建它们时,也可能是在使用RabbitMQ作为代理时从rabbitmqctl list_queues
获取它们,例如,添加一个信号处理程序,将这些动态队列添加到工作人员中以从中消费。
例如使用celeryd_after_setup
信号:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def add_dynamic_queue(sender, instance, **kwargs):
# get the dynamic queue, maybe stored somewhere
queue = 'dynamic_queue'
instance.app.amqp.queues.select_add(queue)
如果您总是创建新的动态队列,您还可以命令工作人员在运行时使用从这些队列开始消费
#command all workers to consume from the 'dynamic_queue' queue
app.control.add_consumer('dynamic_queue', reply=True)
# command specific workers
app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])
请参阅添加消费者。
我希望这能有所帮助,当我得到更多信息时,我会编辑这个问题。