无法使用泛型反序列化动态json



我正在尝试使用Jackson在Java中解析Debezium CDC消息。但是,在执行反序列化时,我得到了一个强制转换异常。我使用泛型作为对象是动态的,并且会改变不同的kafka主题,因为每个表都有不同的主题是MySQL。

Json输入

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"Anne","last_name":"old and new","email":"annek@noanswer.org"},"source":{"version":"1.4.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1614335758000,"snapshot":"false","db":"inventory","table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2150,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1614335758726,"transaction":null}}

根pojo

@JsonIgnoreProperties(ignoreUnknown = true)
public class DebeziumCDCMessage<S,D> {

private S schema;
private DebeziumPayload<D>  payload;

@JsonCreator
DebeziumCDCMessage( @JsonProperty("schema") S _schema,
@JsonProperty("payload") DebeziumPayload<D>  _payload){
this.schema=_schema;
this.payload=_payload;
}
public S getSchema() {
return schema;
}
public void setSchema(S schema) {
this.schema = schema;
}
public DebeziumPayload<D> getPayload() {
return payload;
}
public void setPayload(DebeziumPayload<D> payload) {
this.payload = payload;
}

}

DebeziumPayload Pojo

@JsonIgnoreProperties(ignoreUnknown = true)
public class DebeziumPayload<D> {
private D before;
private D after;
private String op;
private String ts_ms;
private Object source;
@JsonCreator
DebeziumPayload(    @JsonProperty("before") D _before,
@JsonProperty("after") D _after,
@JsonProperty("op") String _op,
@JsonProperty("ts_ms") String _ts_ms,
@JsonProperty("source") Object _source
){
this.before=_before;
this.after=_after; 
this.op=_op;
this.ts_ms=_ts_ms;
this.setSource(_source);
}
public D getBefore() {
return before;
}
public void setBefore(D before) {
this.before = before;
}
public D getAfter() {
return after;
}
public void setAfter(D after) {
this.after = after;
}
public String getOp() {
return op;
}
public void setOp(String op) {
this.op = op;
}
public String getTs_ms() {
return ts_ms;
}
public void setTs_ms(String ts_ms) {
this.ts_ms = ts_ms;
}
public Object getSource() {
return source;
}
public void setSource(Object source) {
this.source = source;
}


}

前后Pojo(仅适用于共享JSON,每个主题将有不同的目标对象)

@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer {
private Integer id;
private String first_name;
private String last_name;
private String email;

@JsonCreator
Customer(@JsonProperty("id") Integer _id,
@JsonProperty("first_name") String _first_name,
@JsonProperty("last_name") String _last_name,
@JsonProperty("email") String _email){
this.id=_id;
this.first_name=_first_name;
this.last_name=_last_name;
this.email=_email;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getFirst_name() {
return first_name;
}
public void setFirst_name(String first_name) {
this.first_name = first_name;
}
public String getLast_name() {
return last_name;
}
public void setLast_name(String last_name) {
this.last_name = last_name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}


}

最终反序列化代码

@Service
public class CustomersCDCConsumer {
@SuppressWarnings("unchecked")
@KafkaListener(topics = "dbserver1.inventory.customers", groupId = "group_id")
public void listenGroupFoo(String message) {
try {
DebeziumCDCMessage<Object,Customer> respo=new ObjectMapper().readValue(message, DebeziumCDCMessage.class);
DebeziumPayload<Customer> customer=respo.getPayload();
System.out.println("data as recieved="+customer.getAfter().getLast_name());
} catch (JsonMappingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

运行代码时出现错误。

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.licious.kafa2sfwrapper.kafkacosumers.CustomersCDCConsumer.listenGroupFoo(java.lang.String)' threw exception; nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class com.licious.kafa2sfwrapper.model.tables.Customer (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; com.licious.kafa2sfwrapper.model.tables.Customer is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @5010be6); nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class com.licious.kafa2sfwrapper.model.tables.Customer (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; com.licious.kafa2sfwrapper.model.tables.Customer is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @5010be6)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2114) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2102) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2001) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class com.licious.kafa2sfwrapper.model.tables.Customer (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; com.licious.kafa2sfwrapper.model.tables.Customer is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @5010be6)
at com.licious.kafa2sfwrapper.kafkacosumers.CustomersCDCConsumer.listenGroupFoo(CustomersCDCConsumer.java:22) ~[classes!/:0.0.1-SNAPSHOT]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.3.4.jar!/:5.3.4]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.3.4.jar!/:5.3.4]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:330) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051) ~[spring-kafka-2.6.6.jar!/:2.6.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988) ~[spring-kafka-2.6.6.jar!/:2.6.6]
... 8 common frames omitted

好的,我已经很接近了,只需要告诉Jackson泛型目标类的类型信息。基本上必须替换

DebeziumCDCMessage<Object,Customer> respo=new ObjectMapper().readValue(message, DebeziumCDCMessage.class);

DebeziumCDCMessage<Object,Customer> respo=new ObjectMapper().readValue(message, new TypeReference<DebeziumCDCMessage<Object,Customer>>() {});

代替泛型数据类型,

我建议你为不同的消息创建一个带有公共字段的基类和多个子pojo类。

并在基类上添加:

@JsonSubTypes

注释并指定其中的所有子类,因此jackson将找到合适的子时间并反序列化json。

可以在这里找到一个简单的例子:https://www.tutorialspoint.com/jackson_annotations/jackson_annotations_jsonsubtypes.htm

最新更新