我正在使用Kafka Stream API。当我启动应用程序时,有时会有一个缺口,我想从特定的偏移量开始消费。最早或最晚都不是我想要的。
streamProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
我想要的是一个场景,比如我在配置文件中设置偏移量或以毫秒为单位的日期,并从那时起开始消费。我想知道是否有办法做到这一点?
配置auto.offset.reset
仅在尚未提交偏移量的情况下对应用程序的首次启动有效。如果提交了偏移量,应用程序将始终从提交的偏移量恢复处理。
在Kafka Streams中,没有API可以显式设置启动偏移。消费者API将允许通过Consumer#seek()
进行此操作。
对于Kafka Streams,获得所需行为的一种方法是,停止应用程序,使用bin/kafka-consmer.group.sh
(或者更好的bin/kafka-streams-application-reset.sh
(,并提交所需的启动偏移量。如果您随后启动应用程序,它将获取提交的偏移量并从那里开始处理。