如何在 RabbitMQ Exchange 中将旧消息排队到新队列中?



我与一种仅将消息重定向到队列topic进行了交换payments

将来的某个地方,我将决定添加另一个队列payment_analyze来分析所有已排队的新旧消息。

持久交换和队列在 Rabbit MQ 重新启动后幸存下来,持久消息被写入磁盘,但是当将新队列绑定到旧的持久交换时,旧消息不会被重定向(只有新消息会被重定向)

根据我的理解,这是预期的行为,因为交换不存储消息,仅充当"代理">

我如何实现这一点?

可能的解决方案

创建一个名为parking的队列并将每个排队的消息添加到其中,每当添加新队列时,都会使用来自parking的消息,而不确认以使新队列"半"保持最新。

即使您在payments队列上配置的持久消息,这也只意味着消息将在代理重新启动后幸存下来 - 一旦消息被使用并确认,它就会被删除。

如果您知道将来的某个时候需要payment_analyze队列,那么预先创建此队列/绑定并将消息路由到payment_analyzepayments是否可行?payment_analyze上的消息将存入银行,直到您准备好开始使用它们。注意:如果您要生成大量消息,则此方法可能会导致存储问题...

作为替代方案,您可以将消息作为payments队列使用者(或完全不同的队列/使用者)的一部分写入 BLOB 存储(或其他数据存储),然后当您准备好引入payment_analyze队列时,您可以编写一个脚本来读取 BLOB 存储中的所有旧消息并将它们发送到 RabbitMQ 交换。通过"主题"交换 - 请参阅此处 - 您可能可以巧妙地在队列绑定中使用通配符和路由键,以确保旧消息(来自 BLOB 存储)和新消息都路由到payment_analyze队列,但只有新消息路由到payments队列(以便您的payments队列使用者不会重新处理旧消息)。

另一种选择(假设您没有过度投资于RabbitMQ)可以考虑Apache Kafka,它可以很好地处理这种情况,因为消息一旦被订阅者处理,就不会自动从分区中删除。

无论如何,只有几个选项需要考虑...

最新更新