我似乎无法在具体的avro实现中使用消息,我得到了以下异常:
class org.apache.avro.generic.GenericData$Record cannot be cast to class my.package.MyConcreteClass
这是代码(我使用Spring Boot
(
MyProducer.java
private final KafkaTemplate<String, MyConcreteClass> kafkaTemplate;
public PositionProducer(KafkaTemplate<String, MyConcreteClass> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(MyConcreteClass myConcreteClass) {
this.kafkaTemplate.send(topic, myConcreteClass);
}
MyConsumer.java
@KafkaListener(topics = "#{'${consumer.topic.name}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void listen(MyConcreteClass incomingMsg) {
//handle
}
请注意,如果我将所有内容都更改为GenericRecord
,则反序列化工作正常,因此我知道所有配置(未粘贴(都配置正确。
同样重要的是要注意,我没有自己注册模式,而是让我的客户端代码为我注册
有什么想法吗?
编辑:
配置:
@Bean
public ConsumerFactory<String, MyConcreteClass> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new DefaultKafkaConsumerFactory<>(props);
}
MyConcreteClass需要扩展SpecificRecord
您可以使用Avro maven插件从模式生成它
然后,您必须配置序列化程序以知道您想要使用特定的记录
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") ;
除了OneCricketer的答案外,在设置特定的avro阅读器配置后,我还遇到了另一个java.lang.ClassCastException。它是nested exception is java.lang.ClassCastException: class my.package.Envelope cannot be cast to class my.package.Envelope (my.package.Envelope is in unnamed module of loader 'app'; my.package.Envelope is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @3be312bd);
。看起来spring-boot devtools将类封装在它的重载器模块中,导致jvm认为这是一个不同的类。
我删除了pom中的spring-boot开发工具,它终于按预期工作了。
<!-- Remove this from pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
我和您遇到了同样的问题。问题是,KafkaAvroDeserializer
将消息反序列化为GenericData$Record
,因此spring-kafka正在搜索用@KafkaListener
注释的类,以使用具有此类型参数的@KafkaHandler
方法。
您需要将此属性添加到spring-kafka配置中,以便反序列化程序可以直接返回您以前需要使用avro插件生成的SpecificRecord
类:
spring:
kafka:
properties:
specific.avro.reader: true
那么你的消费者可能会像这个
@KafkaListener(...)
public void consumeCreation(MyAvroGeneratedClass specificRecord) {
log.info("Consuming record: {}", specificRecord);
}
您需要自定义消费者配置。ContentDeserializer需要是一个KafkaAvroDeserialize,并引用您的模式注册表。