我们有一个带有客户反序列化类的spring Kafka应用程序,我们正在使用@KafkaListener注释来接收消息。我们注意到,通过在自定义反序列化器中添加一条日志语句,我们正在读取批处理(批大小为5)中预期的消息数量,但是带@KafkaListener注释的方法只从该批处理中获取第一条消息。
这里是我们的Kafka配置
package com.aa.ctlctr.processor.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import com.aa.opshub.msgnode.flight.event.json.model.Flight;
@EnableKafka
@Configuration
@PropertySource(value = "classpath:application.yml")
@EnableConfigurationProperties
public class KafkaSourceConfig {
private static Logger logger=Logger.getLogger(KafkaSourceConfig.class);
@Value("${spring.kafka.bootstrap-servers")
private String brokerConnect;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutocommit;
@Value("${spring.kafka.listener.ack-mode}")
private String groupIdConfig;
@Value("${spring.kafka.consumer.properties.max.poll.records:5}")
private String maxPollRecordConfig;
@Value("${spring.kafka.properties.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String saslJaasConfig;
@Value("${spring.kafka.properties.sasl.login.callback.handler.class}")
private String saslClientCallbackHandlerClass;
@Bean
public Map<String, Object> consumer_Configs() {
Map<String, Object> prop = new HashMap<>();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnect);
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaCustomDeserializer.class);
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutocommit);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordConfig);
prop.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
prop.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
prop.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
prop.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, saslClientCallbackHandlerClass);
prop.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
prop.put("ssl.engine.factory.class", InsecureSslEngineFactory.class);
return prop;
}
@Bean
public ConsumerFactory<String, Flight> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumer_Configs(),new StringDeserializer(),
new KafkaCustomDeserializer<>());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Flight> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Flight> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
}
应用yaml
spring:
kafka:
#bootstrap-servers: ${kafka.bootstrap.servers}
bootstrap-servers: <<broker address>>
properties:
security:
protocol: SASL_SSL
sasl:
mechanism: OAUTHBEARER
jaas:
config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
login:
callback:
handler:
class: <<Security call back handler>>
max.request.size: 750000
request.timeout.ms: 30000
linger.ms: 500
delivery.timeout.ms: 91500
metadata.max.age.ms: 180000
connections.max.idle.ms: 60000
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
properties:
max.poll.records: 5
partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
listener:
type: single
ack-mode: batch
KafkaListener配置
@KafkaListener(topics = "#{'${my.kafka.conf.topics}'.split(',')}", concurrency = "${my.kafka.conf.concurrency}", clientIdPrefix = "${my.kafka.conf.clientIdPrefix}", groupId = "${my.kafka.conf.groupId}")
public void kafkaListener(final Flight flight,@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partitionId,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) throws JsonMappingException, JsonProcessingException {
再次仔细查看文档,您会注意到侦听器参数的参数是一个List对象,而不是反序列化类型的单个实体
https://docs.spring.io/spring-kafka/reference/html/batch-listeners