我试图理解Flink上的IntervalJoin操作,并得到了一个问题。
假设我们有三个流A, B
和C
,
这里,我们间隔连接两个流,如A-C
和B-C
。
在java代码中,它将显示如下:
// join stream A and stream C
SingleOutputStreamOperator<SensorReadingOutput> joined1 = A
.intervalJoin(C)
.between(Time.seconds(-1), Time.seconds(0))
.process(new IntervalJoinFunction());
// join stream B and stream C
SingleOutputStreamOperator<SensorReadingOutput> joined2 = B
.intervalJoin(C)
.between(Time.seconds(-1), Time.seconds(0))
.process(new IntervalJoinFunction());
如我们所见,流C
被连接了两次。这里,流
C
可以在两个流A
和B
之间共享吗?也就是说,流
C
是作为单个流存在还是作为每个A
和B
的复制(副本)存在?
我很困惑,因为在IntervalJoin操作两点。
- 每次在interval join的最后调用
.process
时,我们都会创建新的IntervalJoinOperator。我认为C
流应该被复制。 - 在IntervalJoinOperator中,使用由基于事件时间的水印触发的内部定时器服务来清理记录。
A
和B
会有不同的水印,我认为这会影响C
的保留期,所以C
应该单独复制和管理。
然而,当我编写测试代码以查看是否在同一任务实例中收集了具有相同键的三个流记录时,它们确实收集了。
有人知道事实吗?谢谢你!
对于想知道同样问题的人来说,答案是'他们不共享流'。
相反,将为另一个IntervalJoin创建另一个重复流。
我已经做了一些测试,在IntervalJoinOperator中打印缓冲区的地址。
对于A-C
和B-C
的连接,与A
和B
连接的记录C
的相同值显示的地址不同。如果数据流是共享的,那么数据流C
中记录的地址是相同的。
我认为这是因为两个原因。
- 每当为keyyedstream调用
.intervalJoin
时,都会创建新的IntervalJoinOperator,并且它包含自己的缓冲区。每次都会创建新的缓冲区,因此共享流是没有意义的。 - 同时,
A
和B
的水印也不同。在IntervalJoinOperator中,水印决定了缓冲区的保留时间,因此共享缓冲区在这里也没有意义。