Spring kafka 中的 setConsumerTaskExecutor() 和 setConcurrency() 有什么区别? 如果 ConsumerTaskExecutor 的最大池大小与 setConcurrency 值不同,会发生什么情况?
ThreadPoolTaskExecutor customExecutor= new ThreadPoolTaskExecutor();
exec.setCorePoolSize(3);
exec.setMaxPoolSize(6);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConcurrency(10);
factory.getContainerProperties().setConsumerTaskExecutor(customExecutor);
谢谢
concurrency
:
正在运行的最大并发 KafkaMessageListenerContainers 数。来自同一分区内的消息将按顺序处理。
ConsumerTaskExecutor
:
为轮询使用者的线程设置执行程序。
https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.html#getConcurrency()
https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/listener/ContainerProperties.html#setConsumerTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor)
换句话说,ConcurrentKafkaListenerContainer
会根据concurrency
创建 1 个或多个KafkaMessageListenerContainers
。每个KafkaMessageListenerContainer
都共享ContainerProperties
,使用ConsumerTaskExecutor
来运行一个ListenerConsumer
,委托给一个Kafka consumer
进行投票。