Kafka 从相同的偏移量重新启动



我有一个 kafka 消费者,它连接到一个有 3 个分区的主题。一旦我从 kafka 获得记录,我就想捕获偏移量和分区。重新启动时,我想从上次读取偏移量恢复使用者的位置

来自 kafka 文档:

每条记录都有自己的偏移量,因此要管理自己的偏移量,您只需执行以下操作:

配置 enable.auto.commit=false

使用每个消费者记录提供的偏移量来保存您的 位置。

重新启动时,使用搜索恢复使用者的位置 (主题分区,长)。

这是我的示例代码:

constructor{    
load data into offsetMap<partition,offset>
initFlag=true;
}
Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap); 
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}   
}
}
//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db
}
} // Overrides the fetch offsets that the consumer
public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);
}

这是正确的做法吗?还有更好的方法吗?

如果您提交偏移量,Kafka 将为您存储它们(默认情况下最多 24 小时)。

这样,如果您的使用者死亡,您可以在另一台计算机上启动相同的代码,并从上次中断的地方继续。 无需外部存储。

请参阅 https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 中的"偏移量和消费者位置">

并建议您考虑使用 commitSync

这对我来说没关系,只是要小心你的消费者是如何构建的(手动分区分配或自动)

如果分区分配是自动完成的,则需要特别注意处理分区分配更改的情况。这可以通过在调用订阅(Collection,ConsumerRebalanceListener)和订阅(Pattern,ConsumerRebalanceListener)中提供一个ConsumerRebalanceListener实例来完成。例如,当分区取自使用者时,使用者希望通过实现 ConsumerRebalanceListener.onPartitionsRevoked(Collection) 来提交这些分区的偏移量。当分区分配给使用者时,使用者将希望查找这些新分区的偏移量,并通过实现 ConsumerRebalanceListener.onPartitionsAssigned(Collection) 将使用者正确初始化到该位置。

https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

这可以通过控制我们提交偏移量来解决。

首先要做的是在消费者应用程序中将配置"enable.auto.commit"关闭为"false",这样您就可以控制何时提交偏移量。

我们使用 Map 手动跟踪偏移量,如下所示:

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

consumer.subscribe(topic, new CommitCurrentOffset());
try {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// process the record (ex : save in DB / call external service etc..)
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null));  // 1
}
consumer.commitAsync(currentOffsets, null);  // 2
}
finally {
consumer.commitSync(currentOffsets);  // 3
}
class CommitCurrentOffset implements ConsumerRebalanceListener {  // 4
public void onPartitionRevoked(Collection<TopicPartition> topicPartitions) {
consumer.commitSync(currentOffsets);
consumer.close();
}
}
  1. 当我们处理每条消息时,我们添加映射中处理的消息的偏移量,如下所示:

    currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
    new OffsetAndMetadata(record.offset() + 1, null)); 
    
  2. 我们将异步处理的消息的偏移量提交到代理。

  3. 如果在处理消息时出现任何错误/异常,我们将提交为每个分区处理的最新消息的偏移量。

  4. 当我们即将因重新平衡而丢失分区时,我们需要提交偏移量。在这里,我们提交已处理的最新偏移量(每个循环都在),而不是我们仍在处理的批处理中的最新偏移量。我们通过实现 ConsumerRebalanceListener 接口来实现这一点。每当触发重新平衡时,onPartitionRevoked() 方法将在重新平衡开始之前和消费者停止处理消息之后被调用。

最新更新