春云卡夫卡根据报头信息流式传输动态消息转换



我正在尝试使用 Spring Cloud Kafka Streams 来处理包含不同类型消息的 Kafka 主题的消息。例如,我们从主题收到一条 JSON 消息,该消息可以是 A 型或 B 型消息。生产者在标头中添加消息类型,有没有办法在功能绑定器中读取该标头信息并相应地转换消息?或者是否有一个"选择"选项,用于在消息进入时进行分支,以将消息路由到正确的转换器?

如果将绑定配置为使用nativeDecoding,反序列化由 Kafka 完成(通过value.deserializer消费者属性(。

spring-kafka 提供了一个JsonDeserializer,用于在特定标头中查找类型信息(由相应的JsonSerializer设置。

它还提供了一个DelegatingDeserializer,允许您根据spring.kafka.serialization.selector标头中的值选择要使用的反序列化程序。

有关更多信息,请参阅 Spring for Apache Kafka Reference Manual。

最新更新