我使用ConcurrentKafkaListenerContainerFactory,如下所示:
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(40);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
我也有多个特定主题的听众:
@KafkaListener(id = "id1", topicPattern = "test1.*")
public void listenTopic1(ConsumerRecord<String, String> record) {
System.out.println("Topic: " + record.topic());
}
@KafkaListener(id = "id2", topicPattern = "test2.*")
public void listenTopic2(ConsumerRecord<String, String> record) {
System.out.println("Topic: " + record.topic());
}
我正在设置的并发性,是特定于某个侦听器还是所有侦听器?注意:所有主题都有40个分区。
某些主题的负载比其他主题大。
每个容器将获得40个使用者线程。工厂为每个侦听器创建一个具有相同属性的容器。
您的主题至少需要40个分区才能有效,因为一个分区只能由一个组中的一个使用者使用。