我有一个spring-boot Kafka监听器,它监听单个主题和消费者列表。现在这个春季启动应用程序现在有3个节点。现在,我需要确保List消息的数量对所有3个节点都是均匀的负载平衡。如何在我的应用程序中实现这一点?
在Consumer中,group.id(ConsumerConfig.group_id_CONFIG(配置提供负载平衡。发布到主题的每条记录都会传递到每个订阅使用者组中的一个使用者实例。
[group.id][1]:
一个唯一字符串,用于标识此消费者的消费者组属于。如果使用者使用使用订阅(主题(或基于卡夫卡的抵销管理策略。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfiguration {
private Map<String, Object> consumerConfigs() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your brokers");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumeer-group-id");
return props;
}
@Bean
public ConsumerFactory<String, YourClass> kafkaListenerConsumerFactory() {
final ErrorHandlingDeserializer<YourClass> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(YourClass.class, false));
return new DefaultKafkaConsumerFactory<>(this.consumerConfigs(), new StringDeserializer(), errorHandlingDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, YourClass> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, YourClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.kafkaListenerConsumerFactory());
return factory;
}
}
[1]: https://kafka.apache.org/documentation/#consumerconfigs_group.id