芹菜工人:如何从所有队列中消费

  • 本文关键字:队列 芹菜 python celery
  • 更新时间 :
  • 英文 :


我有

  • 设置CELERY_CREATE_MISSING_QUEUES = True
  • 未定义CELERY_QUEUES
  • 定义的CELERY_DEFAULT_QUEUE = 'default'(直接类型)
  • 一个自定义路由器类,用于创建动态路由,如中所示这张票(https://github.com/celery/celery/issues/150)

我看到自定义路由器返回的路由中的新队列被创建,我认为这是因为CELERY_CREATE_MISSING_QUEUES

现在,在我运行的工作节点中,我不传递-Q参数,它只从"默认"队列中消耗,该队列似乎与文档-一致

默认情况下,它将从CELERY_QUEUES设置(如果未指定,则默认为队列命名为芹菜)。

有没有任何方法可以让我的工作节点从所有队列中消费,包括动态创建的队列?

谢谢,

需要告诉工作人员这些自动或动态创建的队列,因此您需要一种方法来获取这些队列名称并存储它们——可能是在创建它们时,也可能是在使用RabbitMQ作为代理时从rabbitmqctl list_queues获取它们,例如,添加一个信号处理程序,将这些动态队列添加到工作人员中以从中消费。

例如使用celeryd_after_setup信号:

from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def add_dynamic_queue(sender, instance, **kwargs):
    # get the dynamic queue, maybe stored somewhere
    queue = 'dynamic_queue'
    instance.app.amqp.queues.select_add(queue)

如果您总是创建新的动态队列,您还可以命令工作人员在运行时使用从这些队列开始消费

#command all workers to consume from the 'dynamic_queue' queue
app.control.add_consumer('dynamic_queue', reply=True)
# command specific workers
app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])

请参阅添加消费者。

我希望这能有所帮助,当我得到更多信息时,我会编辑这个问题。

相关内容

  • 没有找到相关文章

最新更新