将使用者偏移量重置为 Kafka 流的开头



我正在使用Kafka流,并希望将Java的一些消费者偏移量重置到开头。KafkaConsumer.seekToBeginning(...)听起来是正确的做法,但我使用 Kafka Streams:

KafkaStreams streams = new KafkaStreams(builder, props);
...
streams.start();

我想根据我定义的混凝土流管道,这将在引擎盖下创建几个消费者。我可以访问这些吗?或者有没有其他方法可以以编程方式重置偏移量?

基于 Hans Jespersens 的答案,我成功地使用此代码来执行脚本在 Java 代码中执行的操作:

import kafka.tools.StreamsResetter;
StreamsResetter resetter = new StreamsResetter();
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER};
resetter.run(args);

该类是我在 maven 中使用的 kafka 核心库的一部分:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>

由于您使用的是 Kafka Streams,因此您不仅需要重置使用者偏移量,还需要重置 Streams 内部状态存储。

幸运的是,Kafka 提供了一个流应用程序重置工具。

见 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool

最新更新