如何保证在非事务处理的轻量级环境中传递消息?
例如:
- 正常情况:写入数据库,提交,向ZeroMQ|Redis|OtherMQ发送消息,消费者拉消息继续处理
- 0,05%的情况:写入数据库,提交,应用程序死亡,没有发送消息,没有消费者拉消息,处理不完整
在这种情况下,如何避免丢失消息(避免不发送消息)?
编辑:邮件必须只传递一次。
在这个场景中,您有两个共享资源(数据库和队列),您希望它们一起进行事务处理。如果消息发送到队列,则希望数据库提交。如果数据库未成功发送,您希望数据库不提交,反之亦然。这只是像2PC这样的全局事务机制。然而,实现全球交易机制并不是那么容易,而且成本也很高。
我建议您在生产者端实现至少一种策略,在消费者端实现幂等性,以提供一致性。
您应该在生产者端的数据库中创建一个消息表,并在将消息发送到队列之前将其持久化。然后使用一个调度线程(这里可能有多个线程来提高吞吐量,但如果您的消息需要按照它们产生的顺序消耗,请小心)或任何其他东西,您可以将它们发送到队列中,并将它们标记为已发送,以确保已发送的消息不会再次发送。即使您这样做了,在某些情况下,您的消息可能会被多次发送(例如,您将消息发送到队列,而您的应用程序在将消息标记为已发送之前崩溃)。但这不是问题,因为我们已经希望在生产者端实现至少一次策略,这意味着我们希望消息至少被发送到队列一次。
为了防止使用者使用在生产者端多次生成的相同消息,您应该实现幂等使用者。简单地说,您可以将已消费消息的id保存到消费者端的数据库表中,在处理来自队列的消息之前,您可以检查它是否已经被消费。如果它已经被消耗掉了,您应该忽略它并获得下一条消息。
当然,在微服务环境中还有其他选项可以提供一致性。你可以在这个伟大的博客上找到其他解决方案-https://www.nginx.com/blog/event-driven-data-management-microservices/.我上面解释的解决方案也存在于这个博客中。您可以在"使用本地事务发布事件"部分找到它。
这里可能有一个简单的方法。
假设您有交易:
- 将数据写入数据库
- 通过ZMQ发送消息
- 写入发送正常的数据库
所以假设你在第2步或第3步时应用程序崩溃。如果是这样的话,您不知道最后一条消息是否收到了客户队列,并且您必须在没有最后一次确认的情况下重新启动所有消息后重新发送(步骤3)。
问题出在消费者方面,因为他们可能会收到两次消息。为了解决这个问题,您可以在每条消息中发送一个始终在增加的事务ID。消费者必须注意到最后一条消息的交易ID。当传入消息的事务ID不高于上一条消息的事务标识时,可以忽略该消息。
现在的问题是,您是否可以修改消息结构以及可以使用哪个事务ID。