即使没有达到Ack,Kafka偏移量也会增加



我有一个消费者,它消费一条消息,做一些繁重的工作,然后确认。

@KafkaListener(topics = "${kafka.topic}", groupId = "group", containerFactory ="ContainerFactory")
public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {
try {
//Heavy Job
ack.acknowledge();
} catch (Exception e) {
log("Error in Kafka Consumer);
}
}

现在,如果出现异常,它应该进入catch块,而Acknowledge不应该发生,如果Acknowledgement没有发生,它应该返回队列并再次处理。但这并没有发生。偏移量将更新,并拾取下一条消息。据我所知,消费者有一个投票大小,可以一次选择多条消息。但是,即使一条消息没有得到确认,它也应该重新处理它,而不是忽略它并更新偏移量。

这是Kafka消费者配置

`Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

这是底层KafkaConsumer的预期行为。

在封面下,KafkaConsumer使用pollAPI,在JavaDocs中描述为:

"在每次轮询中,使用者将尝试使用上次消耗的偏移量作为起始偏移量,并按顺序提取。最后使用的偏移量可以通过查找(TopicPartition,long(手动设置,也可以自动设置为订阅的分区列表的最后提交偏移量">

这意味着,它不检查最后一个提交的偏移量,而是检查最后消耗的偏移量,然后按顺序获取数据。只有在重新启动作业时,它才会继续读取该使用者组的上次提交偏移量,或者如果您使用基于auto_offset_reset配置的新使用者组。

为了解决您的问题,我看到以下解决方案,您可以在catch块中应用:

  • 而不是仅仅记录";Kafka Consumer中的错误";让你的工作停摆。修复代码并重新启动应用程序
  • 使用偏移量编号(导致异常(使用seekAPI再次将消费者重新定位到相同的偏移量。有关查找方法的详细信息,请点击此处

最新更新