Kafka Spring Deserialzer returnType静态方法从未调用



这是我得到的错误:

org.apache.kafka.common.errors.SerializationException: Error deserializing 
key/value for partition distance-0 at offset 0. If needed, please seek past 
the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers 
and no default type provided
at org.springframework.util.Assert.state(Assert.java:73)
at org. 

springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeselializer.java:370(

以及application-dev.yml:

spring:
json:
use:
type:
headers: false
value:
default:
type: Object
method: com.mycompany.mypackage.KafkaConfig.returnType

也尝试过:

consumer:
bootstrap-servers: 10.10.5.189:9092
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
enable-auto-commit: false
auto-offset-reset: earliest
properties:
spring:
json:
trusted:
packages: '*'
value:
default:
method: com.mycompany.mypackage.KafkaConfig.returnTypee

并没有任何警告,但我把printlin放在静态的方法,它从来没有开火?什么东西?

@KafkaListener(topics = "distance", groupId = "${kafka.myinfo.id}")
public void handle(CustDeletedEvent custDeletedEvent) {
log.debug("received jsonNode: "+ userDeletedEvent);

KafkaConfig.java

// NEVER CALLED!!!
public static JavaType returnType(byte[] data, Headers headers) {
System.out.println("return type called data.length="+data.length);
JavaType custDeletedEvent = 
TypeFactory.defaultInstance().constructType(CustDeletedEvent.class);
return custDeletedEvent;
}

最新配置:

kafka:
bootstrap-servers: 10.10.5.189:9092
producer:
bootstrap-servers: 10.10.5.189:9092
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
bootstrap-servers: 10.10.5.189:9092
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
enable-auto-commit: false
auto-offset-reset: earliest
properties:
spring:
json:
trusted:
packages: '*'
value:
method: mypackage.config.KafkaConfig.returnType

Spring Kafka在此配置

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

这是准确的yaml:

spring:
profiles:
active: dev
kafka:
bootstrap-servers: 10.10.5.189:9092
producer:
bootstrap-servers: 10.10.5.189:9092
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
bootstrap-servers: 10.10.5.189:9092
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
enable-auto-commit: false
auto-offset-reset: earliest
properties:
spring:
json:
trusted:
packages: '*'
value:
method: com.service.cust.impl.returnType

它需要进入spring.kafka.consumer.properties.spring.json...

最新更新