我有一个在springboot中具有精确once_v2保证的KafkaStreams应用程序。
-
App1: Reads from topic ",处理并写入主题。
-
App2读取主题"B"并写信给主题"C"
由于MongoDB暂时问题,App1需要很长时间来处理一条记录,但最终写入主题B。App2正确接收到来自主题B的数据,并写入主题";c &;"。
问题是App1需要很长时间来处理,这会导致生产者重新启动((InvalidProducerEpochException: producer attempt to produce with a old epoch)并创建一个新的epoch。
新的生产者再次读取相同的消息并将其写入新的生产者的主题中,因此我们最终得到了重复。App2再次读取新生产者生成的相同消息。
我想正好有一次可以防止这种情况的发生。
我正在使用Kafka Streams 3.2.0
是否有一种方法可以防止生产者重新启动时两次向主题写入相同的消息?
我希望消息不会重复,即使生产者重新启动。我试着将它设置为恰好一次,但它不适合这种情况
如果生产者失败,相应的事务将被终止。虽然消息物理上仍在输出主题中,但它们被标记为"aborted"。
下游消费者必须配置isolation.level="read_committed"
来过滤放弃的消息。
如果你有一个下游Kafka Streams应用,你也可以这样做。如果下游Kafka Streams应用程序使用一次处理保证,它将自动使用"read_committed"
模式,你不需要做任何额外的事情。
由于MongoDB暂时问题
顺便说一句:如果你正在使用EOS和像MongoDB这样的外部依赖,你不能得到一次保证,因为像访问MongoDB这样的副作用是不受保护的。