忽略在withValueSerde中定义的KafkaStreams自定义SerDes



我正在使用Kafka Streams 2.6.0(在Spring Boot中(,遇到了非常奇怪的问题。我正在尝试对流执行有状态操作(分组和聚合(:

freeTextSignPartialUpdateStream
.groupBy((key, value) -> value.getObjectId(), Grouped.with(Serdes.Long(), FTS_PARTIAL_UPDATE_MSG_SERDE))
.aggregate(
ArrayList::new,
freeTextSignUtils::updateFreeTextSignUpdateList,
Materialized.<Long, List<FreeTextSignPartialUpdate>>as(Stores.inMemoryKeyValueStore("STORE_NAME"))
.withKeySerde(Serdes.Long())
.withValueSerde(FTS_PARTIAL_UPDATE_LIST_SERDE)
.withCachingDisabled()
.withLoggingDisabled()
)
.toStream()
.to(
storesService.getFreeTextSignUpdatesStoreTopicName(),
Produced.with(Serdes.Long(), FTS_PARTIAL_UPDATE_LIST_SERDE)
);

FTS_PARTIAL_UPDATE_LIST是一个适当的Serdes实现,定义为常量(类似于工作时没有任何问题的FTS_PARTIAL_UPDATE_MSG_SERDE(。

奇怪的是,我没有得到ser/des错误,而是完全忽略了withValueSerde中定义的值,而是使用了StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG(当前设置为JsonSerdes(。

我在应用程序的其他流处理部分使用了与上面类似的模式,没有任何问题。此外,当我用我的FTS_PARTIAL_UPDATE_LIST替换默认的JsonSerdes时,这是有效的。

我查看了withValueSerde文档,其中说当输入值为null时,它将回退到默认的Serdes,这显然不是(我在调试器中检查过(。

实际上,这是我的错误,因为SerDes实现中的反序列化程序返回null。

相关内容

  • 没有找到相关文章

最新更新