我有两个服务,管理器和收集器。
-
管理器使用路由密钥
user.collected
订阅队列COLLECTED_USER
,并调用UserCollected
处理程序。 -
收集器订阅了具有路由密钥
user.collect
的队列COLLECT_USER
,并调用CollectUser
处理程序。
可以有多个收集器,因此我已将exclusive
设置为false
(代码见下文(。
还有其他服务可以侦听事件,例如
user.created
,user.updated
,user.deleted
此外,还有一些服务可以侦听更一般的事件,例如
#.created
user.#
等等。
所以我正在使用topic
交换。
设置
| exchange | type | routingKey | queueName |
| -------- | ----- | -------------- | ------------- |
| MY_APP | topic | user.collect | COLLECT_USER |
| MY_APP | topic | user.collected | COLLECTED_USER |
应该发生什么:
- 管理器使用路由发布消息密钥
user.collect
- 收集器获取
user.collect
消息并调用CollectUser
处理程序 - 收集器的
CollectUser
处理程序确实有效,然后发布带有路由密钥的消息user.collected
- 管理器获取
user.collected
消息并调用UserCollected
处理程序
实际发生的情况:
- 管理器使用路由发布消息密钥
user.collect
(正确( - 收集器获取
user.collect
消息并调用CollectUser
处理程序(正确( - 管理器还会获取
user.collect
消息,并使用错误的数据调用UserCollected
处理程序。(错( - 收集器的
CollectUser
处理程序确实有效,然后发布带有路由密钥user.collected
的消息(正确( - 管理器获取
user.collected
消息并调用UserCollected
处理程序(正确(
我的问题
为什么经理会收到user.collect
消息,给定:
- 它侦听的是
COLLECTED_USER
队列而不是COLLECT_USER
队列,并且 - 正在侦听
COLLECT_USER
队列的收集器已处理该消息。
实现详细信息
我按如下方式创建订阅者和发布者(为相关性而修剪(
创建订阅服务器
给定AMQPurl
和参数url
、exchange
、type
、routingKey
、queueName
和handler
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)
创建发布服务器
给定 AMQPurl
和参数url
、exchange
和type
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })
出版
给定channel
和参数exchange
、routingKey
和message
await channel.publish(exchange, routingKey, message)
注意
这个问题是 RabbitMQ 的后续问题 — 为什么在使用主题交换时忽略我的路由密钥 .
我终于弄清楚了我的问题是什么。 一个肮脏的交换。 在尝试这一点时,我无意中添加了一个将消息路由到错误队列的交换,这引起了我的困惑。
为了修复它,我启动了 RabbitMQ 管理员 GUI 并删除了所有队列,让我的代码创建所需的队列。如上所述的代码没有问题。