试图加入两个 Kstream,但我收到类型不匹配错误 下面是代码片段。
KStream<String, String> longCounts = netExpence.join(netIncome, (key1, key2) -> key1 + "/" + key2, JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),stringSerde, stringSerde, stringSerde);
出现的错误是类型不匹配:无法从字符串转换为 R这是加入 kstreams 的语法join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Joined<K,V,VO> joined)
请解释一下ValueJoiner<? super V,? super VO,? extends VR>
到底是做什么的。谢谢
使用匹配记录的两个值调用ValueJoiner
,并发出联接结果值。
// key type must be the same for a join
// value type can be different
KStream<KeyType, ValueType1> stream1 = ...
KStream<KeyType, ValueType2> stream2 = ...
KStream<KeyType, OutputType> joined = stream1.join(stream2, ...);
因此,ValueJoiner
必须将ValueType1
作为第一个通用(? super V
(,ValueType2
作为第二个通用(? super VO
(。对于第三个泛型 (? extend VR
(,您可以指定输出类型(即,来自上例的OutputType
(。
更新
您还需要为运行时配置正确的 Serdes。如果所有类型都相同,最好通过StreamsConfig
相应地设置键和/或值的默认 serdes。否则,您可以覆盖每个运算符的默认 Serdes:
- https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-key-serde
- https://docs.confluent.io/current/streams/developer-guide/datatypes.html