我正在使用Kafka Streams 2.6.0(在Spring Boot中(,遇到了非常奇怪的问题。我正在尝试对流执行有状态操作(分组和聚合(:
freeTextSignPartialUpdateStream
.groupBy((key, value) -> value.getObjectId(), Grouped.with(Serdes.Long(), FTS_PARTIAL_UPDATE_MSG_SERDE))
.aggregate(
ArrayList::new,
freeTextSignUtils::updateFreeTextSignUpdateList,
Materialized.<Long, List<FreeTextSignPartialUpdate>>as(Stores.inMemoryKeyValueStore("STORE_NAME"))
.withKeySerde(Serdes.Long())
.withValueSerde(FTS_PARTIAL_UPDATE_LIST_SERDE)
.withCachingDisabled()
.withLoggingDisabled()
)
.toStream()
.to(
storesService.getFreeTextSignUpdatesStoreTopicName(),
Produced.with(Serdes.Long(), FTS_PARTIAL_UPDATE_LIST_SERDE)
);
FTS_PARTIAL_UPDATE_LIST
是一个适当的Serdes实现,定义为常量(类似于工作时没有任何问题的FTS_PARTIAL_UPDATE_MSG_SERDE
(。
奇怪的是,我没有得到ser/des错误,而是完全忽略了withValueSerde
中定义的值,而是使用了StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
(当前设置为JsonSerdes
(。
我在应用程序的其他流处理部分使用了与上面类似的模式,没有任何问题。此外,当我用我的FTS_PARTIAL_UPDATE_LIST
替换默认的JsonSerdes
时,这是有效的。
我查看了withValueSerde
文档,其中说当输入值为null
时,它将回退到默认的Serdes,这显然不是(我在调试器中检查过(。
实际上,这是我的错误,因为SerDes实现中的反序列化程序返回null。