我似乎无法将主题的序列化程序重写为Serdes.String()
。我正在尝试一个简单的用例,即从主题(流(中阅读,并向KTable中写作。到目前为止我所拥有的:
@Component
class Processor {
@Autowired
public void process(final StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
builder.stream("input_topic", Consumed.with(stringSerde, stringSerde))
.filter((key, value) -> value.contains("ACTION"))
.toTable(Materialized.as("output_table_materialized"))
.toStream().to("output_table", Produced.with(stringSerde, stringSerde)); // EDIT: added this last line
}
}
我得到的例外是:
org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
据我所知,它理解消息是String
,但它使用默认的反序列化器ByteArraySerializer
。上面的代码哪里出错了?
我遇到了类似的问题,解决方案是在Materialized
实例上指定serdes,即交换
.toTable(Materialized.as("output_table_materialized"))
带有
.toTable(Materialized.as("output_table_materialized").withKeySerde(stringSerde).withValueSerde(stringSerde))
Consumer.withs将是De序列化程序。
错误出现在Serializer或toTable调用上,您可以添加Produced.with
或修改应用程序属性以配置的默认值