使用模式注册表将Kafka-Avro序列化/反序列化为具体类型失败



我似乎无法在具体的avro实现中使用消息,我得到了以下异常:

class org.apache.avro.generic.GenericData$Record cannot be cast to class my.package.MyConcreteClass

这是代码(我使用Spring Boot(

MyProducer.java

private final KafkaTemplate<String, MyConcreteClass> kafkaTemplate;
public PositionProducer(KafkaTemplate<String, MyConcreteClass> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(MyConcreteClass myConcreteClass) {
this.kafkaTemplate.send(topic, myConcreteClass);
}

MyConsumer.java

@KafkaListener(topics = "#{'${consumer.topic.name}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void listen(MyConcreteClass incomingMsg) {
//handle
}

请注意,如果我将所有内容都更改为GenericRecord,则反序列化工作正常,因此我知道所有配置(未粘贴(都配置正确。

同样重要的是要注意,我没有自己注册模式,而是让我的客户端代码为我注册

有什么想法吗?

编辑:

配置:

@Bean
public ConsumerFactory<String, MyConcreteClass> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new DefaultKafkaConsumerFactory<>(props);
}

MyConcreteClass需要扩展SpecificRecord

您可以使用Avro maven插件从模式生成它

然后,您必须配置序列化程序以知道您想要使用特定的记录

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") ;

除了OneCricketer的答案外,在设置特定的avro阅读器配置后,我还遇到了另一个java.lang.ClassCastException。它是nested exception is java.lang.ClassCastException: class my.package.Envelope cannot be cast to class my.package.Envelope (my.package.Envelope is in unnamed module of loader 'app'; my.package.Envelope is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @3be312bd);。看起来spring-boot devtools将类封装在它的重载器模块中,导致jvm认为这是一个不同的类。

我删除了pom中的spring-boot开发工具,它终于按预期工作了。

<!-- Remove this from pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>

我和您遇到了同样的问题。问题是,KafkaAvroDeserializer将消息反序列化为GenericData$Record,因此spring-kafka正在搜索用@KafkaListener注释的类,以使用具有此类型参数的@KafkaHandler方法。

您需要将此属性添加到spring-kafka配置中,以便反序列化程序可以直接返回您以前需要使用avro插件生成的SpecificRecord类:

spring:
kafka:
properties:
specific.avro.reader: true

那么你的消费者可能会像这个

@KafkaListener(...)
public void consumeCreation(MyAvroGeneratedClass specificRecord) {
log.info("Consuming record: {}", specificRecord);
}

您需要自定义消费者配置。ContentDeserializer需要是一个KafkaAvroDeserialize,并引用您的模式注册表。

最新更新