CQRS和活动来源-阅读您自己的活动



目前,我正在实现事件驱动架构,并有一个用于命令的服务(写入部分(和另一个用于查询的服务(读取部分(。

我现在在做什么。

  1. 接受CommandService上的命令
  2. 在事件总线上存储事件和发布事件
  3. ReadService侦听这些事件并更新读取模型

如果你听自己的活动,这听起来不错。假设我收听来自CommandService 的外部事件

  1. 收听事件
  2. 处理此事件的命令
  3. 将域生成的事件存储在事件存储中,并将此事件发布到事件总线
  4. ReadService侦听这些事件并更新读取模型

在这种方法中,我可以看到更新读取模型的延迟是原来的两倍。首次延迟->CommandService时间拉取事件第二延迟->读取从CommandService生成的事件的服务时间。

我在想,如果我更新ReadService,在不需要事件总线的情况下直接监听CommandService事件存储,那么我可以减少其中一个延迟。

你觉得怎么样?

我们不久前也做过类似的事情。基本上,我们有

  1. 实现流程管理器模式的服务,并使用EventSourcing和Saga在多个服务之间执行一些编排逻辑。存储在数据库中的每个事件都包含一个序列化为Avro格式的EventPayload,其中包含事件发生时进程的状态。该有效负载包含所有状态,而不仅仅是差异,所以我们在处理过程中一直在更新该有效负载
  2. 我们已经使用Kafka Connect JDBC源连接器从EventStore中读取,基本上,一旦在EventStore中发布了新事件,连接器就会读取该事件并将其发布到Kafka(Kafka被用作EventBus(
  3. 放置在另一个服务中的读取模型是通过Kafka更新的(这里有两种方法:使用KafkaConnect JDBC Sink Connector或使用Kafka Consumer并从Consumer进行事务更新(

我希望它能帮助你!

我在想,如果我更新ReadService以监听CommandService事件存储直接不需要事件总线,然后我可以减少其中一个延迟。

是的,你会减少延迟,但你会在两个服务之间引入耦合,这并不一定是坏事,但如果两个服务解耦(使用总线(,你可以独立扩展每个服务。

此外,如果您使用RabbitMQ等托管总线,您将受益于消息确认、多种类型的消息传递和队列。。等

最新更新