我在代码中使用auto.offset.reset=earliest
,并在以下代码的帮助下在kafka中使用了偏移量提交。
val offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRanges
inputStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
现在,当我运行程序时,它不会收到新消息,因为所有消息都已提交。
我正在QA中测试这段代码,所以想将偏移量重置为开始,但看起来最早的不起作用,它没有读取新消息,主题中也没有新消息。我想从头开始阅读消息,以进行测试。
如果最早提交的消息并没有从一开始就提取,有人能提供帮助吗?
只有当分区没有提交的偏移量时,才使用属性auto.offset.reset
。您可以使用kafka-consumer-groups
(作为Kafka的一部分(重置整个组的偏移量:
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute