如何为Kafka KeyValuestore设置值DE/序列化器



我在keyvaluestore中存储了来自kafka主题的消息,以便以后可以查询它们。我创建一个KTable如下:

@StreamListener public void process(@Input("input") KTable<String,MyMessage> myMessages) {

我在我的应用程序中配置了消费者。yml如下:

更新了DE/serializer软件包

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.me.MyMessageDeserializer key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.me.MyMessageSerializer

但是,当我从键Valuestore阅读时,键在字符串时正确返回,但返回的值是字节数组而不是myMessage。由于某种原因,我的自定义求职者没有被使用。我试图自己对消息进行挑选,但是我的避难所崩溃了。我在序列化器上放了一个断点,它从未被调用。对我来说,很明显,我的序列化器或避难所都没有使用。

我缺少哪种配置,以便使用自定义值DE/serialializer?DE/序列化需要在特定的软件包中找到吗?

在应用程序中使用了错误的配置键。而不是键驱动器:它应该是keyserde:而不是value-deserializer:应该是valueserde。以下是正确的配置:

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde producer: keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde

相关内容

  • 没有找到相关文章

最新更新