RabbitMQ:连接丢失后如何处理不需要的重复取消确认消息?



在我的应用程序(多个实例(中,我们偶尔会看到由于网络问题,我的应用程序和 rabbitmq 之间的连接丢失的情况(我的应用程序和 rabbitmq 都处于活动状态(,然后在连接恢复(重新建立(后,我们将收到未确认的消息。

这给我们带来了一个问题,因为我的应用程序没有死,它仍在处理之前收到的相同消息,但现在消息被重新发送,它导致应用程序再次处理该消息(这对我们来说可能是致命的(。

由于应用程序具有多个实例,因此一个实例不容易检查另一个实例是否同时处理相同的消息。我们不能简单地过滤掉重新传递的消息,因为我们需要此功能来处理实例/应用崩溃/重新部署。

似乎没有一个 API 可以告诉 rabbitmq 何时不重新传递未发送的消息。

那么处理这种情况的推荐做法是什么?

谢谢

这种情况的一般解决方案是使使用者以幂等方式处理消息。通常我所做的是从生产者端(如果消息正文中没有唯一标识符(我向消息正文添加一个属性幂等 ID,这是一个 guid,在消费者端,对于每条消息,此 id 根据数据库中存储的值进行验证,任何重复项都将被拒绝。

此方法也适用于可能从另一个集群中铲除的消息,或者如果在同一集群中有多个使用者实例正在侦听,那么这种方法也保证一次性处理。

建议在这里查看 RabbitMQ 可靠性指南

是的,精确一次交付不是 RabbitMQ 擅长的。事实上,我想说你可能不应该用它来解决这类问题。老实说,真正解决此问题的唯一方法是使用分布式事务或锁定。

无论如何,你可以通过在消费者收到消息后立即确认问题来扭转问题,然后再开始处理它。这至少可以避免与RabbitMQ相关的重复问题。这是最多一次交付。

当然,这意味着如果消费者崩溃,消息将永远丢失。因此,您需要在确认消息之前保留消息,以便以后可以恢复它,并且消费者应在完成后将其删除。

考虑到崩溃很少见,您可以有一个专用进程来处理这些持久化的消息。或者就此而言,手动处理它们。

请注意,您正在将重复问题推到您面前,因为使用者在完成处理后可能无法删除持久化的消息,但至少您可以选择随心所欲地实现它。

在这种情况下,存储可以是文件,RDBMS或ZooKeeper或Redis之类的任何东西,以锁定/解锁动态消息。

相关内容

最新更新