如何在Spring Kafka中包含用于反序列化的类型元数据



我正在Spring Kafka中的Listener上进行反序列化。但这是假设类型信息是由Spring Kafka制作人包含或发送的。在我的例子中,Json是由Debezium MySQLConnector发送的,它不添加这个元数据。所以我想把它添加到请求中。我知道它被放在JsonSerializer的某个请求中,我查看了源代码,但不知道如何在序列化期间将元数据类型添加到请求中。特别是哪个字段保存这种类型的信息?它是被序列化的java对象的类名吗?我认为仅仅设置一个默认的序列化程序是行不通的,因为正如人们所期望的那样,我有多个消费者在收听不同的主题。除了最简单的情况外,设置一个默认值是行不通的,因为我有很多反序列化的消费者和类型

Update尝试在反序列化程序上使用方法类型,但有另一个问题:Kafka Spring Deserialzer returnType静态方法从未调用

参见

public abstract class AbstractJavaTypeMapper implements BeanClassLoaderAware {
/**
* Default header name for type information.
*/
public static final String DEFAULT_CLASSID_FIELD_NAME = "__TypeId__";
/**
* Default header name for container object contents type information.
*/
public static final String DEFAULT_CONTENT_CLASSID_FIELD_NAME = "__ContentTypeId__";
/**
* Default header name for map key type information.
*/
public static final String DEFAULT_KEY_CLASSID_FIELD_NAME = "__KeyTypeId__";
/**
* Default header name for key type information.
*/
public static final String KEY_DEFAULT_CLASSID_FIELD_NAME = "__Key_TypeId__";
/**
* Default header name for key container object contents type information.
*/
public static final String KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME = "__Key_ContentTypeId__";
/**
* Default header name for key map key type information.
*/
public static final String KEY_DEFAULT_KEY_CLASSID_FIELD_NAME = "__Key_KeyTypeId__";

2组标题(键和值(。

TypeId用于简单类

如果TypeId是容器List<?>

ContentTypeId是包含的类型。

如果TypeIdMap

Key_TypeId是密钥类型。

这允许您重建Map<Foo, Bar>

这些头可以包含完全限定的类名,也可以包含通过classIdMappings映射映射到类名的标记。

然而,自从版本2.5以来,使用新的会更容易

使用方法确定类型。

这样,您就可以设置自己的头并在方法中检查它们。

编辑

这里有一个简单的例子:

@SpringBootApplication
public class Gitter76Application {
public static void main(String[] args) {
SpringApplication.run(Gitter76Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("gitter76").partitions(1).replicas(1).build();
}
@KafkaListener(id = "gitter76", topics = "gitter76")
public void listen(Foo in) {
System.out.println(in);
}
}
public class Foo {
private String bar;
public Foo() {
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.demo
$ kafkacat -P -b localhost:9092 -t gitter76 -H __TypeId__=com.example.demo.Foo
{"bar":"baz"}
^C
2020-08-08 09:32:10.034  INFO 58146 --- [ gitter76-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : gitter76: partitions assigned: [gitter76-0]
Foo [bar=baz]

最新更新