如何始终从卡夫卡流中的最新偏移量消耗



我们的要求是,如果 kafka-stream 应用程序正在使用一个分区,它应该从该分区的最新偏移量开始消耗它。

这似乎是可行的

streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

现在,假设使用上述配置,kafka-stream 应用程序开始使用分区最新偏移量中的数据。一段时间后,应用程序崩溃。当应用重新上线时,我们希望它使用来自该分区的最新偏移量的数据,而不是上次读取的位置。

但是我找不到任何可以帮助使用 kafka-streams api 实现它的东西。

附言我们正在使用 kafka-1.0.0。

这不是开箱即用的

配置auto.offset.reset仅触发器,如果没有提交的偏移量,并且没有用于更改此行为的配置。

您可以在启动前手动操作偏移量不过使用bin/kafka-consumer-groups.sh - application.idgroup.id,您可以在重新启动应用程序之前"寻求结束"。

更新:

从 1.1.0 版本开始,您可以使用bin/kafka-streams-application-reset.sh工具来设置起始偏移量。若要使用该工具,应用程序必须处于脱机状态。(参见:https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application(

相关内容

  • 没有找到相关文章

最新更新