是否有任何方法可以从Java API中的特定偏移量开始消费kafka主题



我正在使用Kafka Stream API。当我启动应用程序时,有时会有一个缺口,我想从特定的偏移量开始消费。最早或最晚都不是我想要的。

streamProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

我想要的是一个场景,比如我在配置文件中设置偏移量或以毫秒为单位的日期,并从那时起开始消费。我想知道是否有办法做到这一点?

配置auto.offset.reset仅在尚未提交偏移量的情况下对应用程序的首次启动有效。如果提交了偏移量,应用程序将始终从提交的偏移量恢复处理。

在Kafka Streams中,没有API可以显式设置启动偏移。消费者API将允许通过Consumer#seek()进行此操作。

对于Kafka Streams,获得所需行为的一种方法是,停止应用程序,使用bin/kafka-consmer.group.sh(或者更好的bin/kafka-streams-application-reset.sh(,并提交所需的启动偏移量。如果您随后启动应用程序,它将获取提交的偏移量并从那里开始处理。

相关内容

最新更新