我正在尝试使用flink的数据流API来协调第2组数据流。
stream1.coGroup(stream2)
.where(stream1Item -> streamItem.field1)
.equalTo(stream2Item -> stream2Item.field1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
.apply(someCoGroupFunction)
.sinkTo(someSink)
上面的代码在stream1.field1 = stream2.field1
上进行coGrouping时成功工作。但是下面这种复合场等式是如何为coGoruping建立的呢?
stream1.field1 = stream2.field1 AND stream1.field2 = stream2.field2 AND (stream1.field3 = stream2.fieldN OR stream1.field4 = stream2.fieldN)
使用Flink的SQL API,可以像一样完美地编写这样的复合联接条件
LEFT JOIN ON
Table1.field1 = Table2.field1 AND
Table1.field2 = Table2.field2 AND
(Table1.field3 = Table2.fieldN OR Table1.field4 = Table2.fieldN)
但是,如何在具有用于联接/共组的复合复杂条件的数据流API中实现相同的功能?
DataStream联接只支持相等谓词,但我认为您可以在coGroupFunction内部实现额外的约束。或者你可以取两个coGrouping的并集,Or的每一边一个。
但是,考虑到DataStream和TableAPI是可互操作的,为什么不将Table/SQL API用于管道的这一部分呢?