我在keyvaluestore中存储了来自kafka主题的消息,以便以后可以查询它们。我创建一个KTable如下:
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myMessages) {
我在我的应用程序中配置了消费者。yml如下:
更新了DE/serializer软件包
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.me.MyMessageDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.me.MyMessageSerializer
但是,当我从键Valuestore阅读时,键在字符串时正确返回,但返回的值是字节数组而不是myMessage。由于某种原因,我的自定义求职者没有被使用。我试图自己对消息进行挑选,但是我的避难所崩溃了。我在序列化器上放了一个断点,它从未被调用。对我来说,很明显,我的序列化器或避难所都没有使用。
我缺少哪种配置,以便使用自定义值DE/serialializer?DE/序列化需要在特定的软件包中找到吗?
在应用程序中使用了错误的配置键。而不是键驱动器:它应该是keyserde:而不是value-deserializer:应该是valueserde。以下是正确的配置:
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde