Kafka消费者组消费者故障转移检测



我有一个kafka主题,只有一个分区。

在任何时间点,都可能有多个kafka客户端。所有客户端都是使用同一消费者组订阅的。因此,在任何给定的时间点,只有一个客户端会接收消息。假设从t0到t10,consumer1正在接收消息,但一段时间后它停止接收消息,consumer2被选为新的领导者(可能是因为consumer1中的GC暂停(。在我的consumer1中,我想检测这种故障转移何时发生,这样它就可以刷新其本地状态。

是否可以使用kafka客户端代码?

可以使用ConsumerRebalanceListener接口中可用的onPartitionsRevoked回调方法。

根据描述,

用户可以实现的回调方法,用于处理向自定义存储的偏移提交。当使用者必须放弃某些分区时,将在重新平衡操作期间调用此方法。当消费者关闭或取消预订时,也可以调用它。建议在此回调中将偏移量提交给Kafka或自定义偏移量存储,以防止重复数据。

示例实现:

private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
// Add your changes here to flush
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
}
}

示例:

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class TestConsumer {

KafkaConsumer<String, String> kafkaConsumer;

public static void main(String[] args) {
TestConsumer consumer = new TestConsumer();
consumer.pollMessages();
}
public TestConsumer() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-example-consumer");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("input-topic"), new ConsumerPartitionAssignmentListener());
}

public void pollMessages() {
while(true) {
System.out.println("Polling");
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(5000));
System.out.println(records.count());
}
}

private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
// Add your changes here to flush
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
}
}
}

输出:

Poll
Partitions assignment listener: [input-topic-0]
0
Poll
0
Poll
Partitions revoke listener: [input-topic-0]
Partitions assignment listener: [input-topic-0]
0
Poll
0

最新更新