让我们假设enable.auto.commit=true,让我们假设我从中读取消息的主题有一些长时间的不活动(假设48小时没有消息(。结果连续的 poll(( 调用在 48 小时内不会返回任何消息,我的问题是:
最后返回的消息的偏移量(48 小时相同(是否会在__consumer_offsets主题中每个auto.commit.interval.ms一次又一次地提交,该主题是压缩的,其过期时间由offsets.retention.minutes控制?
一次又一次地提交将防止主题中的记录过期__consumer_offsets并在某个时候被删除。
这是一个有趣的问题。
编辑:根据最近的评论,更新此内容。更新的部分被删除线,并明确或斜体标记。
我会选择"否">"是">,即如果没有新消息到达主题,则不会一次又一次地提交最后返回的消息的偏移量。
这是相同的解释。
一个典型的消费者示例如下所示:
Properties props = new Properties();
<other-properties>
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
因此,偏移量提交的责任通常由使用者承担,并由轮询循环驱动。
现在,在上次提交后描述的场景中,对poll()
方法的每次调用都将返回一个空映射。因此,如果没有poll()
返回的记录,则没有要提交的新偏移量。
以下是我如何追踪 Kafka 的源代码并得出这个结论的。以下 return 语句来自此处给出的poll()
方法定义
return ConsumerRecords.empty();
此文件中可用的empty()
方法的定义。
编辑:以下部分是基于格温评论的新添加。
但是,在返回空映射之前,还有另一个poll()
方法(位于ConsumerCoordinator
类中(通过KafkaConsumer
类的poll()
方法调用,该方法根据此处给出的定义。 处理定期偏移量提交(如果通过以下方法启用
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
我希望这有帮助!