加入两个卡夫卡流



试图加入两个 Kstream,但我收到类型不匹配错误 下面是代码片段。

KStream<String, String> longCounts = netExpence.join(netIncome, (key1, key2) -> key1 + "/" + key2,                    JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),stringSerde, stringSerde, stringSerde);

出现的错误是类型不匹配:无法从字符串转换为 R这是加入 kstreams 的语法join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Joined<K,V,VO> joined)

请解释一下ValueJoiner<? super V,? super VO,? extends VR>到底是做什么的。谢谢

使用匹配记录的两个值调用ValueJoiner,并发出联接结果值。

// key type must be the same for a join
// value type can be different
KStream<KeyType, ValueType1> stream1 = ...
KStream<KeyType, ValueType2> stream2 = ...
KStream<KeyType, OutputType> joined = stream1.join(stream2, ...);

因此,ValueJoiner必须将ValueType1作为第一个通用(? super V(,ValueType2作为第二个通用(? super VO(。对于第三个泛型 (? extend VR(,您可以指定输出类型(即,来自上例的OutputType(。

更新

您还需要为运行时配置正确的 Serdes。如果所有类型都相同,最好通过StreamsConfig相应地设置键和/或值的默认 serdes。否则,您可以覆盖每个运算符的默认 Serdes:

  • https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-key-serde
  • https://docs.confluent.io/current/streams/developer-guide/datatypes.html

相关内容

  • 没有找到相关文章

最新更新