将消息从"reply-to"队列复制到另一个队列



我主要在RPC模式下使用rabbitMq,但我也想将请求和响应消息复制到另一个队列。

最后,我想要实现的是,外部 consummer 可以通过监听队列来查看所有 trafic 的情况,我们称之为"日志记录队列"。

复制传入消息是可以的,我只需要使用扇出交换或将我的日志记录队列绑定到使用的交换,其路由密钥与 RPC 调用使用的路由密钥相同。

但是我无法找到一种方法来"扇出"通过直接回复功能发送的消息。

到目前为止,我知道响应消息以 amqp.rabbitmq.reply-to.generated Name 的形式发送到带有生成的routing_key的默认直接交换,并且由于默认交换是不可触及的,我无法复制这些消息。

你知道有什么办法吗?

我有一个我宁愿避免的解决方案:让客户端将从回复中收到的响应重新发送到"日志记录队列"。

但这意味着我的客户负责这个"日志记录"功能,我宁愿不这样做。

顺便说一句,即使我认为这无关紧要,因为它可能是 rabbitMq 服务器配置问题,我也使用 Spring-AMQP 客户端

你不能用固定的回复来做到这一点,因为没有真正的队列/交换。

但是,您可以将每个RabbitTemplate配置为使用固定回复队列和回复容器将回复从该队列定向到模板。

文档在这里。

此外,使用此机制时,可以将模板的replyAddress配置为交换和路由密钥的形式。

/**
* An address for replies; if not provided, a temporary exclusive, auto-delete queue will
* be used for each reply, unless RabbitMQ supports 'amq.rabbitmq.reply-to' - see
* https://www.rabbitmq.com/direct-reply-to.html
* <p>The address can be a simple queue name (in which case the reply will be routed via the default
* exchange), or with the form {@code exchange/routingKey} to route the reply using an explicit
* exchange and routing key.
* @param replyAddress the replyAddress to set
*/
public synchronized void setReplyAddress(String replyAddress) {...}

您只需像往常一样设置模板和容器,使模板成为容器的侦听器......

@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}

同样,这是来自文档。

我找到了实现我想要的方法。

这并不像加里·罗素(Gary Russell(提出的那样灵活: https://stackoverflow.com/a/59976806/2546702

但是,我可以利用 firehose 功能并在 amq.rabbitmq.trace 交换上绑定队列(或交换以获得更多控制(,并将路由键设置为"发布"。(末尾的点很重要(

这允许记录发布到默认交易所的消息,包括回复消息。

当然,使用Firehose会影响性能,但就我而言,这没什么大不了的,因为RabbitMQ未得到充分利用。

由于我有 16 个队列要侦听,因此我宁愿不为每个队列使用不同的模板。我可以对所有 RPC 队列使用单个回复队列,但这将是一个瓶颈。

因此,如果没有硬NOGO,Firehose似乎是一个不错的选择。

最新更新