如何让pika首先监听优先级队列



我有两个通过rabbitmq进行通信的微服务,我需要实现优先级消息。

第一个微服务充当发布者,用symfony+messenger(amqp-transport(编写。

第二个微服务充当消费者,用python+pika编写。

信使文档(https://symfony.com/doc/current/messenger.html#prioritized-transports(建议对不同的消息优先级使用单独的队列,该组件无法使用rabbitmq的内置功能来对消息进行优先级排序。实际上,发布者没有任何问题,我对其进行了配置,使必要的消息进入优先级队列。

消费者出现了问题,我无法让pika先读取优先级队列,然后再读取常规队列。

以下是我的信使组件配置示例:

framework:
messenger:
transports:
priority:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: priority
queues:
priority: ~
normal:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: normal
queues:
normal: ~
routing:
'AppMessagePriorityRequest': priority
'AppMessageNormalRequest': normal

这就是我填充队列的方式:

for ($i = 0; $i < 10; $i++) {
$bus->dispatch(new PriorityRequest($i, 'priority'));
$bus->dispatch(new NormalRequest($i, 'normal'));
}

这里有一个python+pika中的消费者实现示例:

import pika
import os
def do_work(self, connection, channel, delivery_tag, body):
print(body)

parameters = pika.URLParameters(os.getenv('MESSENGER_TRANSPORT_DSN'))
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='priority', durable=True)
channel.queue_declare(queue='normal', durable=True)
channel.basic_consume(queue='priority', on_message_callback=do_work, auto_ack=True)
channel.basic_consume(queue='normal', on_message_callback=do_work, auto_ack=True)
channel.start_consuming()

如果我们运行消费者代码,我们会得到以下输出:

{'id': 0, 'data': 'priority'}
{'id': 0, 'data': 'normal'}
{'id': 1, 'data': 'priority'}
{'id': 1, 'data': 'normal'}
{'id': 2, 'data': 'priority'}
{'id': 2, 'data': 'normal'}
{'id': 3, 'data': 'priority'}
{'id': 3, 'data': 'normal'}
{'id': 4, 'data': 'priority'}
{'id': 4, 'data': 'normal'}
{'id': 5, 'data': 'priority'}
{'id': 5, 'data': 'normal'}
{'id': 6, 'data': 'priority'}
{'id': 6, 'data': 'normal'}
{'id': 7, 'data': 'priority'}
{'id': 7, 'data': 'normal'}
{'id': 8, 'data': 'priority'}
{'id': 8, 'data': 'normal'}
{'id': 9, 'data': 'priority'}
{'id': 9, 'data': 'normal'}

消息按FIFO顺序处理,如何强制pika首先处理优先级队列中的消息,并且只有在优先级队列为空时才转到正常队列?

Pika不支持这一点。

一个选项是首先从优先级队列中basic_consume。当队列为空时,取消该使用者,然后从另一个队列中取消basic_consume。当该工作完成后,重复并返回优先级队列。


注意:RabbitMQ团队监控rabbitmq-users邮件列表,有时只回答StackOverflow上的问题