RabbitMQ — 为什么错误的订阅者会收到已发布的消息?



我有两个服务,管理器收集器

  1. 管理器使用路由密钥user.collected订阅队列COLLECTED_USER,并调用UserCollected处理程序。
  2. 收集器订阅了具有路由密钥user.collect的队列COLLECT_USER,并调用CollectUser处理程序。

可以有多个收集器,因此我已将exclusive设置为false(代码见下文(。

还有其他服务可以侦听事件,例如

  • user.created
  • user.updated
  • user.deleted

此外,还有一些服务可以侦听更一般的事件,例如

  • #.created
  • user.#

等等。

所以我正在使用topic交换。

设置

| exchange | type  | routingKey     | queueName      |
| -------- | ----- | -------------- | -------------  |
| MY_APP   | topic | user.collect   | COLLECT_USER   |
| MY_APP   | topic | user.collected | COLLECTED_USER |

应该发生什么:

  1. 管理器使用路由发布消息密钥user.collect
  2. 收集器获取user.collect消息并调用CollectUser处理程序
  3. 收集器的CollectUser处理程序确实有效,然后发布带有路由密钥的消息user.collected
  4. 管理器获取user.collected消息并调用UserCollected处理程序

实际发生的情况:

  1. 管理器使用路由发布消息密钥user.collect(正确(
  2. 收集器获取user.collect消息并调用CollectUser处理程序(正确(
  3. 管理器还会获取user.collect消息,并使用错误的数据调用UserCollected处理程序。(错(
  4. 收集器的CollectUser处理程序确实有效,然后发布带有路由密钥user.collected的消息(正确(
  5. 管理器获取user.collected消息并调用UserCollected处理程序(正确(

我的问题

为什么经理会收到user.collect消息,给定:

  1. 它侦听的是COLLECTED_USER队列而不是COLLECT_USER队列,并且
  2. 正在侦听COLLECT_USER队列的收集器已处理该消息。

实现详细信息

我按如下方式创建订阅者和发布者(为相关性而修剪(

创建订阅服务器

给定AMQPurl和参数urlexchangetyperoutingKeyqueueNamehandler

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)

创建发布服务器

给定 AMQPurl和参数urlexchangetype

const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })

出版

给定channel和参数exchangeroutingKeymessage

await channel.publish(exchange, routingKey, message)

注意

这个问题是 RabbitMQ 的后续问题 — 为什么在使用主题交换时忽略我的路由密钥 .

我终于弄清楚了我的问题是什么。 一个肮脏的交换。 在尝试这一点时,我无意中添加了一个将消息路由到错误队列的交换,这引起了我的困惑。

为了修复它,我启动了 RabbitMQ 管理员 GUI 并删除了所有队列,让我的代码创建所需的队列。如上所述的代码没有问题。

最新更新