我正在寻找一种在kafkastreams中进行后处理工具的方法,该工具将允许从一个主题中的一开始重新处理数据(应用某些过滤器并将这些事件的更新版本编写为同一主题)。同时,有一个长期运行的申请处理数据。
为了重新处理仅到启动应用程序并在其之后停止的时间点,需要知道何时停止,这是当时最新产生的偏移。例如。可以在启动拓扑之前构建地图,该拓扑具有(分区 ->偏移)以了解这些限制,因此应用程序将能够在达到偏移时停止,将当前分区和偏移(通过处理器API)与该应用程序进行比较该初始地图的偏移限制。
从Kafka流中访问最新的偏移信息是否有可能?还有另一种解决方法吗?(我想您可以通过常规的Kafka消费者来创建它,寻求结束并获得位置,但是我询问Kafkastreams中是否有一个集成的解决方案)。
另外,只有在所有分区都达到其偏移时,如何轻轻停止应用程序,知道此信息已分发,以便您需要从所有情况下了解状态?
kafka/kafkastreams 2.1,scala 2.12
使用消费者获得倾斜度似乎是合理的。要停止应用程序,您需要构建一个跟踪进度的手动解决方案。例如,使用transformValues()
,您可以检查输入记录的主题名称,分区和偏移(使用context
对象通过init()
方法提供)。这应该允许您在处理所有数据时调用KafkaStreams#close()
。
您可能对讨论类似想法的这个KIP感兴趣:https://cwiki.apache.org/confluence/conflue/display/kafka/kafka/kip-95: incremental incremental batch batch batch kafka streams