我正在尝试在 Kafka 流之上实现一个简单的 CQRS/事件溯源概念证明(如 https://www.confluent.io/blog/event-sourcing-using-apache-kafka/中所述)
我有 4 个基本部分:
commands
主题,它使用聚合 ID 作为按聚合顺序处理命令的键events
主题,聚合状态的每个更改都将发布到该主题(同样,键是聚合 ID)。本主题的保留策略为"从不删除">-
一个 KTable,用于减少聚合状态并将其保存到状态存储
事件主题流 -按聚合 ID>分组到 Ktable ->将聚合事件还原到当前状态 ->具体化为状态存储
-
命令处理器 - 命令流,左联接聚合状态 KTable。对于结果流中的每个条目,使用函数
(command, state) => events
生成生成的事件并将其发布到events
主题
问题是 - 有没有办法确保我在状态存储中拥有最新版本的聚合?
如果违反业务规则,我想拒绝命令(例如 - 如果实体被标记为已删除,则修改实体的命令无效)。但是,如果发布DeleteCommand
后紧随其后的ModifyCommand
,则删除命令将生成DeletedEvent
,但是在处理ModifyCommand
时,状态存储中的加载状态可能尚未反映该状态,并且将发布冲突事件。
我不介意牺牲命令处理吞吐量,我宁愿获得一致性保证(因为所有内容都按同一键分组,并且应该最终位于同一分区中)
希望这是清楚的:)有什么建议吗?
我认为 Kafka 还不适合 CQRS 和事件溯源,就像你描述的那样,因为它缺乏一种(简单的)方法来确保免受并发写入的影响。本文将对此进行详细讨论。
我所描述的方式的意思是,您希望命令生成零个或多个事件或失败并出现异常;这是具有事件源的经典 CQRS。大多数人都期待这种架构。
但是,您可以使用不同的样式进行事件溯源。您的命令处理程序可以为收到的每个命令生成事件(即DeleteWasAccepted
)。 然后,事件处理程序最终可以以事件源方式处理该事件(通过从其事件流重建聚合的状态)并发出其他事件(即ItemDeleted
或ItemDeletionWasRejected
)。因此,命令是触发后忘记的,异步发送,客户端不会等待立即响应。但是,它等待描述其命令执行结果的事件。
一个重要的方面是,事件处理程序必须以串行方式(恰好一次且按顺序)处理来自同一聚合的事件。这可以使用单个 Kafka 消费者组来实现。您可以在此视频中看到有关此体系结构的信息。
请阅读我的同事Jesper的这篇文章。Kafka是一个很棒的产品,但实际上根本不适合事件采购
https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c
我想出的一个可能的解决方案是实现一种乐观的锁定机制:
- 在命令上添加
expectedVersion
字段 - 使用 KTable
Aggregator
增加每个已处理事件的聚合快照的版本 - 如果
expectedVersion
与快照的聚合版本不匹配,则拒绝命令
这似乎提供了我正在寻找的语义