我有一个Kafka Streams DSL应用程序,我们要求只进行一次处理,为此我添加了配置
streamConfig.put(processing.gurantee, "exactly_once");
我用的是卡夫卡2.7我有两个问题
- exactly_once和exactly-once_beta之间有什么区别
- 如何测试此功能以确保我的消息只被处理一次
谢谢!
exactly_once_beta
是对exactly_once
的改进。exactly_once
为每个流任务(子拓扑和输入分区的组合(使用事务生成器,而exactly_once_beta
为Kafka Streams客户端的每个流线程使用事务生成器。每个生产者都有单独的内存缓冲区、单独的线程、单独的网络连接,这可能会限制输入分区的数量(即任务数量(。大量的生产者也可能给经纪人带来更大的负担。因此,exactly_once_beta
具有更好的缩放特性。您可以在KIP-447中找到更多详细信息。
注意,在Apache Kafka 3.0中,exactly_once
将被弃用,exactly_once_beta
将被重命名为exactly_once_v2
。详见KIP-732。
对于测试,您可以从Apache Kafka repo:中的测试中获得灵感
- https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
- https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
- https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/streams/streams_eos_test.py
基本上,您需要创建一个故障转移场景,并验证是否没有多次向输出主题生成消息。请注意,消息可能被处理多次,但输出主题中的结果必须显示为只处理过一次。您可以在这里找到一个非常好的关于一次性语义的讨论,它还解释了故障切换场景:https://www.confluent.io/kafka-summit-london18/dont-repeat-yourself-introducing-exactly-once-semantics-in-apache-kafka/