Kafka concurrency config Spring setConsumerTaskExecutor and



Spring kafka 中的 setConsumerTaskExecutor() 和 setConcurrency() 有什么区别? 如果 ConsumerTaskExecutor 的最大池大小与 setConcurrency 值不同,会发生什么情况?

ThreadPoolTaskExecutor customExecutor= new ThreadPoolTaskExecutor();
exec.setCorePoolSize(3);
exec.setMaxPoolSize(6);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConcurrency(10);
factory.getContainerProperties().setConsumerTaskExecutor(customExecutor);

谢谢

concurrency

正在运行的最大并发 KafkaMessageListenerContainers 数。来自同一分区内的消息将按顺序处理。

ConsumerTaskExecutor

为轮询使用者的线程设置执行程序。

https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.html#getConcurrency()

https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/listener/ContainerProperties.html#setConsumerTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor)


换句话说,ConcurrentKafkaListenerContainer会根据concurrency创建 1 个或多个KafkaMessageListenerContainers。每个KafkaMessageListenerContainer都共享ContainerProperties,使用ConsumerTaskExecutor来运行一个ListenerConsumer,委托给一个Kafka consumer进行投票。

相关内容

  • 没有找到相关文章

最新更新