如何在事件驱动架构中控制消息的幂等性?



我正在做一个项目,其中DynamoDB被用作数据库,应用程序的每个用例都是由在数据库中创建/更新项目后发布的message触发的。目前代码遵循此方法:

repository.save(entity);
messagePublisher.publish(event);

Udi Dahan 有一个名为Reliable Messaging Without Distributed Transactions的视频,他在视频中谈到了系统在保存到数据库后但在发布消息之前可能立即失败的情况的解决方案,因为消息不是事务的一部分。但是在他的解决方案中,我认为他假设使用SQL数据库,因为该过程涉及保存正在处理的消息的 correlationId、实体修改和要发布的消息作为事务的一部分。使用NoSQL数据库,我想不出一种干净的方法来存储有关消息的信息。

解决方案是使用DynamoDBstreams并订阅使用Lambda或其他服务发布的事件,以将其转换为特定于域的事件。我的问题是我无法从域逻辑发送消息,逻辑将分布在处理消息的服务中,Lambda/service对更改做出反应,解决方案将是特定于平台的。

还有其他方法可以解决这个问题吗?

我不能说基于 DynamoDB 的特定解决方案,因为我从未使用过这个引擎。但是我已经在MongoDB之上构建了一个事件驱动系统,所以我可以分享我的经验,你可能会发现对你的案例有用。

您可以使用不同的方法:

1) 基于事件溯源方法,您只需将用例生成的事件/消息保存在事务中。在Mongo中,当您只是将新项插入/追加到同一集合时,您可以确保原子性。无论如何,如果引擎不提供该功能,则查询操作非常集中,因此您至少可以减少出错的可能性。

存储所有事件后,可以使用它们并将其投影到给定状态,然后将更新的状态保留在另一个事务中。

在这里,您必须处理最终一致性,因为在预测事件之前,数据将在读取模型中过时。

2) 另一种方法是应用UnitOfWork模式,在该模式下缓存所有查询操作(插入/更新/删除)以保存事件和状态。用例完成后,您可以针对数据库执行所有缓存的查询(刷新)。这样,尽管操作不是原子的,但您再次将它们集中到足以最大程度地减少错误。

当然,如果您需要 ACID 数据库,最好使用该功能,任何其他方法都是接近它的解决方法。

关于发布事件,我不知道你的意思是将它们发布到消息传递传输机制,如 rabbitmq、Kafka 等。但这必须是一个后台进程,您可以在其中从数据库获取事件并发布它们,以便在同一事务中中断 2 阶段提交。

最新更新