Celery&&RabbitMQ 如何发布消息进行交换?



现在,我想将register事件发布到某个特殊的交换机,我可以使用芹菜远程检索和处理它。

事实上,我已经使用了send_task函数来实现这一点,但它必须通过task_name来指示应该执行哪个任务并使用它。所以它似乎不太适合我的目标。

我想要的就是:

  1. register消息发布到某个Exchange
  2. 远程机器1订阅此topicroute_key并捕获消息,使用is执行任务
  3. 远程机器2-与机器1相同,但执行另一个任务-接收该任务(可能需要回复特定的queue(

例如,就像这个工作流一样:

寄存器:

  • send_email
  • generate_info

如果我需要非标准交换,我会这样做。

在我的celeryconfig中,我指定了那个交换,并将队列分配给它,如下所示(在我的情况下,我需要fanout交换(:

from kombu.common import Broadcast
from kombu import Exchange, Queue
CELERY_QUEUES = (
Broadcast(name='queue_name', exchange=Exchange('queue_name', type='fanout')),
)

然后,我用芹菜multi生成worker,并将其分配给我的特定队列,如下所示:

celery multi start 1 -A my_project -Q:1 queue_name -c:1 1 (other options go here)

然后我可以将我的任务插入到队列中,如下所示:

from my_project import my_fancy_task
my_fancy_task.apply_async(args=(x, y, z), queue="my_queue")

我不太理解您的具体用例,如果您需要一台主机上的工作人员来使用一个队列中的任务,然后需要另一台主机的工作人员使用另一个队列的任务,那么只需将您的任务拆分为两个队列,并将每个主机配置为启动工作人员,并将其分配给对您有意义的任何队列。也许这会有所帮助:RabbitMQ 的主题交换歧义

相关内容

  • 没有找到相关文章

最新更新