为什么消息优先级不对 RabbitMQ 中的消息进行排序?



我与生产者和消费者一起运行长生不老药程序。我为我的消息设置了优先级。但是优先级不起作用。

我的程序:

发行人:

Basic.publish(channel, "my_exchange", "my_queue", "1", persistent: true, priority: 2)
Basic.publish(channel, "my_exchange", "my_queue", "2", persistent: true, priority: 3)
Basic.publish(channel, "my_exchange", "my_queue", "3", persistent: true, priority: 1)
Basic.publish(channel, "my_exchange", "my_queue", "4", persistent: true, priority: 0)
Basic.publish(channel, "my_exchange", "my_queue", "5", persistent: true, priority: 4)

消费者:

def init(_) do
Connection.open(Application.get_env(:app, :rabbitmq))
{:ok, channel} = Channel.open(conn)
:ok = Basic.qos(channel, prefetch_count: 1, global: true)
{:ok, _consumer_tag} = Basic.consume(channel, "my_queue", nil, arguments: [{"x-priority", :signedint, 100}])
{:ok, %{channel: channel}}
end
def handle_info({:basic_deliver, payload, %{delivery_tag: tag, priority: priority}}, %{channel: channel} = state) do
Logger.info("Received message with payload: #{payload} and priority: #{priority}")
Basic.ack(channel, tag)
{:noreply, state}
end

发布后,我运行消费者。

预期产出:

Received message with payload: 5 and priority: 4
Received message with payload: 2 and priority: 3
Received message with payload: 1 and priority: 2
Received message with payload: 3 and priority: 1
Received message with payload: 4 and priority: 0

实际输出:

Received message with payload: 1 and priority: 2
Received message with payload: 2 and priority: 3 
Received message with payload: 3 and priority: 1
Received message with payload: 4 and priority: 0
Received message with payload: 5 and priority: 4

我做错了什么?消息的优先级不起作用吗?

由于"队列"的基本含义,人们不应该期望消息在队列中排序(大约任何队列都是LIFO或FIFO,RabbitMQ的一个是后者(。

在队列中随机排列消息以按每个插入的优先级对它们进行排序会大大降低性能。RabbitMQ 实际允许的(从 3.5.0 开始生效(是:

  • 确定队列的优先级和
  • 优先考虑消费者。

注意:尽管如此,从 3.5.0 开始,RabbitMQ允许对卡在队列中的消息进行排序,假设消费者获得的消息比他们进入队列的速度慢。在这种情况下,未使用的邮件将被排序。

但是,它仍然无法保证排序。优先级只允许您让一些队列/消费者"举手"首先接收消息,除非它们被阻止。否则,优先级链中的下一个将收到一条消息。

如果你需要对收入进行排序(很难想象在这种情况下为什么要拿起一个消息队列,但仍然如此(,你必须在发送到发布者的队列之前,或者在从消费者队列中收集所有预期的消息之后对其进行排序。

正如这里所指出的,正确的队列参数是x-max-priority。您正在使用x-priority,因此您的队列未声明为优先级队列。

文档: https://www.rabbitmq.com/priority.html

只是为了消除任何疑问,您的消费者和生产者似乎同时连接和处理。发生这种情况时,消息将直接路由到使用者(实质上(,而无需排队。

因此,RabbitMQ 工作正常,但您的期望需要稍作调整。消息优先级将仅对队列中的消息进行操作,即使这样,在某些情况下(例如并行使用者(可能不遵守优先级。

底线 - 不要编写要求以任何特定顺序传递消息的单元测试,因为这不是消息队列系统提供的保证。

最新更新