Spring Kafka with multiple Partition & Multiple listener



我想为一个主题设置多个分区,比如说3个分区。此外,它将在同一组中有3个消费者从这些分区中读取(根据kafka设计1-1映射(。

我的配置如下:

@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaMessageContainer() throws Exception {

//Setting 3 Partition
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic, 0), new TopicPartitionOffset(topic, 1), new TopicPartitionOffset(topic, 2));
//Group id is set same for multiple instances of this service
containerProps.setGroupId(groupId);

ConcurrentMessageListenerContainer<String, String> factory = new ConcurrentMessageListenerContainer <String, String>(kafkaConsumerFactory(kafkaBootStrapServers), containerProps);

factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(KAFKA_RETRY_INTERVAL, KAFKA_RETRY_MAX_ATTEMPT)));
//Setting 3 listeners
factory.setConcurrency(3);

return factory;
}
private ConsumerFactory<String, String> kafkaConsumerFactory(String kafkaBootStrapServers) {

Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

return new DefaultKafkaConsumerFactory<String, String>(config);
}

当我从服务的单个实例运行这个时,侦听器可以很好地从各自的主题分区读取消息(没有重复(。但当我创建了两个服务实例时,来自这两个实例的侦听器将再次读取同一主题分区。这会导致重复处理同一条消息。

我的问题是,如果我在两个实例中都没有更改侦听器的组id,那么为什么不同实例中的侦听器再次读取相同的消息。此外,我如何才能确保它在单个实例中正常工作。感谢

ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic, 0), new TopicPartitionOffset(topic, 1), new TopicPartitionOffset(topic, 2));

您正在手动分配分区,因此group.id与分配目的无关。

如果您想使用组管理将分区分布在多个实例中,您只需要使用ContainerProperties containerProps = new ContainerProperties(topic);

对于并发性为3的2个实例,您至少需要6个分区;否则就会有闲置的消费者。

当只有一个实例时,它将获得所有6个分区。