Kafka 不能不从开头读取 -Java 就不能消费



我是kafka的新手,并试图使用kafka构建生产者 - 消费者应用程序。在这里,我能够向 kalka 生成消息,但是当我尝试使用消费者将其消费回时,它返回 0 条记录。

我检查了我的消费者组的偏移量,我可以看到偏移量等于日志长度相同(在我的情况下为 1M - 与记录数相同)。

如果我在创建使用者时使用此配置属性,则从头开始读取。

configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

但我的要求是,如果我重新启动消费者,它应该像 AMQ 一样从之前的端点开始。

我在这里缺少什么吗?我认为只有在消费者民意调查之后,抵消才应该改变。为什么它在开头本身就设置为最大记录长度?

如链接所述,您需要考虑以下几种情况:

  1. 启动一个新的消费者(新group.id):对于这种情况,不会有提交的偏移量,因此消费者开始根据参数设置读取auto.offset.reset

  2. 重新启动使用者(重用group.id):在这种情况下,使用者将从中断的位置继续。 参数设置auto.offset.reset将被忽略。

因此,对于场景 (1),您只需"配置"您的起始位置。对于场景 (2),您的起始位置是"固定的"(即,始终是最后提交的偏移量),并且无法通过配置进行更改。但是,您始终可以在第一次致电poll()之前执行.seekToBeginning().seekToEnd(),并阅读整个主题或从主题末尾开始。对.seekXX()的调用将"覆盖"上次提交的偏移量,并允许您以您喜欢的任何偏移量开始使用。请注意,还有一些采用"偏移参数"的seek(),因此您可以指定要开始使用的任何偏移量。

相关内容

最新更新