我有kafka connect使用Glue Schema Registry以AVRO GENERIC_RECORD格式编写的主题。我可以使用一个普通的java程序来使用这些文档。然而,我在使用spring boot应用程序阅读和消费它们时遇到了困难。
我的简单配置类
@EnableKafka
@Configuration
public class KafkaAvroConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String brokers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
// Creating a Listener
@Bean
public ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
// Creating a Map of string-object pairs
Map<String, Object> config = new HashMap<>();
// Adding the Configuration
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
config.put(AWSSchemaRegistryConstants.AWS_REGION, region);
config.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);
config.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
config.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS,
MySchemaNamingStrategy.class.getName());
return config;
}
}
和监听器类
@Component
public class KafkaAvroConsumer {
@Autowired
KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;
@KafkaListener(topics = "gsr1.HR.DEPARTMENTS")
public void listenDepartment(ConsumerRecord<GenericRecord, GenericRecord> record) {
//System.out.println("DEPARTMENTS key schema = " + record.key().getSchema().toString());
GenericRecord key = record.key();
GenericRecord value = record.value();
System.out.println(" record.key() = " + key);
System.out.println(" record.value() = " + value);
System.out.println(" Key DEPARTMENT_ID = " + key.get("DEPARTMENT_ID"));
System.out.println(" DEPARTMENT_NAME = " + (String) value.get("DEPARTMENT_NAME"));
}
}
这给了我一个错误在"GenericRecord key = record.key();",看起来他们没有被反序列化到GenericRecord,相反,他们只是原始字节
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.GenericRecord (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.avro.generic.GenericRecord is in unnamed module of loader 'app')
我正在寻找和在spring文档中DefaultKafkaConsumerFactory方法也将键和值反序列化类也作为参数。所以我试着这样做,但不编译。GlueSchemaRegistryKafkaDeserializer也不接受类型参数
public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() {
Deserializer<GenericRecord> avroDeser = new GlueSchemaRegistryKafkaDeserializer();
avroDeser.configure(consumerConfigs(), false);
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), avroDeser, avroDeser);
}
在如何得到这个工作的任何帮助。我把这个问题放在GSR github上https://github.com/awslabs/aws-glue-schema-registry/issues/241
这是POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.1</version>
<relativePath/>
</parent>
<groupId>com.test</groupId>
<artifactId>SpringBootKafkaAvro</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBootKafkaAvro</name>
<description>Spring boot Kafka Avro using Glue Schema registry</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
<version>1.1.14</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
我知道问题出在哪里了。在配置类中,SpringBoot期望工厂bean名称为kafkaListenerContainerFactory。我命名它为concurrentKafkaListenerContainerFactory这导致了不能正确加载消费者和胶水配置的问题。
默认情况下,bean名称为kafkaListenerContainerFactory是预期。
https://docs.spring.io/spring-kafka/docs/current/reference/html/kafka-listener-annotation