重写KStream的默认序列化程序(ByteArraySerializer)



我似乎无法将主题的序列化程序重写为Serdes.String()。我正在尝试一个简单的用例,即从主题(流(中阅读,并向KTable中写作。到目前为止我所拥有的:

@Component
class Processor {
@Autowired
public void process(final StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
builder.stream("input_topic", Consumed.with(stringSerde, stringSerde))
.filter((key, value) -> value.contains("ACTION"))
.toTable(Materialized.as("output_table_materialized"))
.toStream().to("output_table", Produced.with(stringSerde, stringSerde)); // EDIT: added this last line
}
}

我得到的例外是:

org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

据我所知,它理解消息是String,但它使用默认的反序列化器ByteArraySerializer。上面的代码哪里出错了?

我遇到了类似的问题,解决方案是在Materialized实例上指定serdes,即交换

.toTable(Materialized.as("output_table_materialized"))

带有

.toTable(Materialized.as("output_table_materialized").withKeySerde(stringSerde).withValueSerde(stringSerde))

Consumer.withs将是De序列化程序。

错误出现在Serializer或toTable调用上,您可以添加Produced.with或修改应用程序属性以配置的默认值

相关内容

  • 没有找到相关文章

最新更新