启用 Kafka 自动提交时,相同的偏移量是否会多次提交?



让我们假设enable.auto.commit=true,让我们假设我从中读取消息的主题有一些长时间的不活动(假设48小时没有消息(。结果连续的 poll(( 调用在 48 小时内不会返回任何消息,我的问题是:

最后返回的消息的偏移量(48 小时相同(是否会在__consumer_offsets主题中每个auto.commit.interval.ms一次又一次地提交,该主题是压缩的,其过期时间由offsets.retention.minutes控制?

一次又一次地提交将防止主题中的记录过期__consumer_offsets并在某个时候被删除。

这是一个有趣的问题。

编辑:根据最近的评论,更新此内容。更新的部分被删除线,并明确或斜标记。

我会选择"否">"是">,即如果没有新消息到达主题,则不会一次又一次地提交最后返回的消息的偏移量。

这是相同的解释。

一个典型的消费者示例如下所示:

Properties props = new Properties();
<other-properties>
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

因此,偏移量提交的责任通常由使用者承担,并由轮询循环驱动。

现在,在上次提交后描述的场景中,对poll()方法的每次调用都将返回一个空映射。因此,如果没有poll()返回的记录,则没有要提交的新偏移量。

以下是我如何追踪 Kafka 的源代码并得出这个结论的。以下 return 语句来自此处给出的poll()方法定义

return ConsumerRecords.empty();

此文件中可用的empty()方法的定义。

编辑:以下部分是基于格温评论的新添加。

但是,在返回空映射之前,还有另一个poll()方法(位于ConsumerCoordinator类中(通过KafkaConsumer类的poll()方法调用,该方法根据此处给出的定义。 处理定期偏移量提交(如果通过以下方法启用

(:
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}

我希望这有帮助!

最新更新