我仍在研究我在为什么卡夫卡消费者不产生结果?在那篇文章中,我问为什么设置
kstreams_props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
在将任何数据推向任何主题之前,似乎并没有将KAFKA状态重置为"宇宙开始"。我现在正在遇到这个问题的变体:
我的应用程序由一个生产者程序组成,该程序将数据推向KAFKA流和一个消费者程序,该程序将数据分组,汇总组,然后将所得的KTable转换回流中,我将其打印出来。
。汇总步骤本质上是将所有值加起来,然后将这些总和作为新数据放入输出流中。不过,我观察到的是,每次我运行程序时,所得的汇总值都会变得越来越大,好像Kafka在某种程度上保留了先前的结果,并包括汇总的结果。
为了尝试修复此问题,我删除了我的主题(除了 __consumer_offsets
,kafka不允许使用(,然后重新运行我的应用程序,但是汇总的值继续增长,如如果卡夫卡(Kafka(仍保留了以前的计算结果,即使删除中间主题的i 思想可以解决问题。我什至尝试停止和重新启动Kafka服务器,无济于事。
这是怎么回事,更重要的是,我该如何解决?我尝试了有关设置AUTO_OFFSET_RESET_CONFIG
的各种建议,也没有效果。我应该提到,我的应用程序的一个方面是我的原始生产商在Producer.send
呼叫中创建了自己的kafka时间戳,尽管已禁用似乎也没有效果。
预先感谢 - 马克
AUTO_OFFSET_RESET_CONFIG
仅在没有承诺偏移的情况下触发:如果启动应用程序,则首先要查找犯罪的偏移,并仅在没有有效偏移的情况下应用重置策略。
此外,对于Kafka流应用程序,重置偏移是不够的,您应该使用重置工具bin/kafka-streams-applicaion-reset.sh
-此博客文章详细说明了该工具:https://www.confluent.io/blog/blog/data---------重新处理与kafka-streams-resetting-a-streams-application/