在PCF中部署的春季启动应用程序中提高Kafka侦听器的性能



我正在处理的用例需要每秒处理近600条消息(从主题订阅、转换、保存到SQL Server表并生成回主题(,但我们每5个实例每秒只处理100条消息。我们不能增加更多的实例来实现这一点。有什么建议会有帮助吗?

技术和基础设施:在PCF中部署了带有Kafka侦听器(无批处理侦听器(的spring引导应用程序。source和out主题各有10个分区。默认属性和设置正在使用。转换需要几毫秒。

我有一个类似的用例,我通过以下配置向每个侦听器添加并发性(10(并增加队列中的分区来提高性能

@Bean
public ThreadPoolTaskExecutor messageProcessorExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(poolSize);
exec.setMaxPoolSize(poolMaxSize);
exec.setKeepAliveSeconds(keepAlive);
return exec;
}
@Bean
public ConsumerFactory<String, Request> consumerFactory() {
DefaultKafkaConsumerFactory<String, Request> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());
consumerFactory.setKeyDeserializer(new StringDeserializer());
consumerFactory.setValueDeserializer(new JsonDeserializer<>(Request.class));
return consumerFactory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Request>> kafkaListenerContainerFactory(
ThreadPoolTaskExecutor messageProcessorExecutor,
ConsumerFactory<String, Request> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setConsumerTaskExecutor(messageProcessorExecutor);
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

最新更新