卡夫卡流通过键连接,条件复杂



我正在尝试通过键连接KStreamGlobalKTable,但具有特定的逻辑。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"
stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
(key, value) -> key,
(valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});

例如,如果键 = "ABC",则:

  • 首先,通过完整键连接 - 即"ABC"="ABC">
  • 然后,如果未连接,则通过前两个符号连接(删除一个符号( - 即"AB"="AB">
  • 最后,尝试仅使用一个符号连接 - 即"A" = "A">

此外,还需要知道执行连接的条件 - 例如,通过 3 个字母/2 个字母/1 个字母。

问题是,是否有可能,或者我应该寻找解决方法?例如,使用相应的键(带有"ABC"键的表,一个带有"AB"键和一个带有"A"键的表(复制 GlobalKTable 并执行 3 个单独的连接?或者还有其他建议?

提前感谢!

对多个表使用一系列左连接是可能的(如果您经常知道要尝试连接(。如果联接成功,则跳过下一个联接。结合使用leftJoin()branch()应该允许您在每次加入后将流拆分为"已加入"和"重试"。最后,如果需要,您可以将不同的结果流merge()在一起。

最新更新