如何从头开始重新使用消息



我在代码中使用auto.offset.reset=earliest,并在以下代码的帮助下在kafka中使用了偏移量提交。

val offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRanges
inputStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

现在,当我运行程序时,它不会收到新消息,因为所有消息都已提交。

我正在QA中测试这段代码,所以想将偏移量重置为开始,但看起来最早的不起作用,它没有读取新消息,主题中也没有新消息。我想从头开始阅读消息,以进行测试。

如果最早提交的消息并没有从一开始就提取,有人能提供帮助吗?

只有当分区没有提交的偏移量时,才使用属性auto.offset.reset。您可以使用kafka-consumer-groups(作为Kafka的一部分(重置整个组的偏移量:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

最新更新