在transform()之后,KStream/KTable leftjoin会导致: StreamsException:



我写了下面的DSL:

myStream
        .leftJoin(myKtable, new MyValueJoiner())
        .groupByKey(Grouped.with(Serdes.String(),MyObject.serde()))
        .reduce((v1, v2) -> v2, Materialized.as("MY_STORE"))
        .toStream()

这工作正常,leftjoin(( 没问题,并且 reduce(( 很好地实现为一个状态存储,我可以在其上执行 put(( 和 delete((。

但是,如果我编写了实现转换器接口的MyTranformer 类并执行以下操作:

myOtherStream.transform(() -> new MyTransformer<>(), MY_STORE)
        .leftJoin(myOtherKTable, new MyOtherValueJoiner<>());

然后我得到以下异常:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.MyObject). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)

从Javadoc来看,leftjoin似乎使用默认的serdes,并且似乎没有办法强制使用自定义serdes,但是对于其他操作数来说,这是可能的。

但是,如果我在 transform(( 之后执行 leftjoin(( 以外的其他操作,例如 mapValue(( 或 filter((,它会按预期工作。但是一旦我执行 leftjoin((,我就会遇到强制转换异常。

我可以在转换((之后使用leftjoin((吗?

为什么在第一种情况下,即使文档说它使用默认的serdes,leftjoin((也可以工作,而对于转换器,它失败了?

最后,

我只是在Javadoc中错过了leftJoin((采用Joined参数来指定要使用的serdes:

myOtherStream.transform(() -> new MyTransformer<>(), MY_STORE)
    .leftJoin(myOtherKTable, new MyOtherValueJoiner<>(), Joined.with(JoinedKey.serde(), JoinedValue.serde(), JoinedValueOutput.serde()));

最新更新