使用 Kafka 为微服务提供事件溯源



我有几个使用来自 kafka 的数据的微服务。它们使用并向代理生成数据。

这些微服务只有易失性存储(hazelcast(。当存储丢失时,我需要根据 kafka 中的主数据重建它。

我的 naiv 实现只是再次消耗这些数据,但随后我向代理生成一些旧数据。这再次触发了其他微服务,这似乎是一个坏主意。

有没有处理此用例的标准方法?对我来说,这似乎是一个非常普遍的问题,还是我弄错了什么?

这在以前有人问过。

使用 Kafka 作为事件存储应该无关紧要,因为问题是微服务重新发送事件。

花了几天时间,我想出了以下解决方案。

关键思想是在两种模式下进行同步,即恢复和正常

  • 在恢复模式下,我只消耗数据,但不生成任何数据。
  • 在正常模式下,我消耗和生成数据。

在 Kafka 中,我使用属于不同消费者组的两个侦听器来实现这一点。启动时,所有侦听器都会停止,我决定启用侦听器类型。一旦所有恢复侦听器的偏移量达到普通侦听器的水印,我就会停止恢复列表器并启动普通侦听器。

在我的代码的相关部分下面:

public void startListeners() {
log.debug("get partitions from application");
final List<KafkaPartitionStateKey> partitions = getPartitions();
log.debug("load partition state from hazelcast");
final Map<KafkaPartitionStateKey, KafkaPartitionState> kafkaPartitionStates = kafkaPartitionStateService.loadKafkaPartitionStateMap();
log.debug("check if in sync");
if (areAllPartitionsReady(partitions, kafkaPartitionStates)) {
log.info("all partitions ready, not need to start recovery");
this.messageListenerContainers.forEach(this::startContainer);
return;
}
log.debug("load consumer group offsets from kafka");
consumerGroupOffsets = getConsumerGroupOffsets();
log.debug("create missing partition states");
final List<KafkaPartitionState> updatedPartitionStates = getOrCreatePartitionStates(partitions, kafkaPartitionStates, consumerGroupOffsets);
log.debug("check if all partitions are ready");
if (getNumberOfNotReadyPartitions(updatedPartitionStates) == 0) {
log.info("all partitions ready, no need to start recovery");
this.messageListenerContainers.forEach(this::startContainer);
return;
}
log.info("----- STARTING RECOVERY -----");
this.recoveryListenerContainers.forEach(this::startContainer);
}

我希望这对某人有用...

最新更新