我有一个关于在Flink上连接两个流的问题。我使用两种不同的数据流,有时我需要加入他们。每个数据流都标记了一个唯一的id,作为这些流之间的连接点。没有窗口的概念,所以为了连接这两个数据流,我做第一个。connect(second(。keyBy(0,0(。
当我得到正确的结果时,这似乎奏效了,但我的担忧是长期的。我没有明确保留执行联接的运算符(coFlatMap(的状态,但如果假设一个流(例如第一个(提供唯一的id,而第二个无法提供加入id(我想对于那些已经加入的人,操作符会丢弃任何类型的内部状态(?内存/状态占用是不断增长还是存在某种过期机制?
如果是这样的话,我该如何解决这个问题?或者你能给我另一种方法吗?
有几种方法可以实现这种联接。
-
使用CoProcessFunction。当密钥的第一条记录到达时,您将其存储在状态中,并注册一个计时器,该计时器将在x分钟/小时/天后启动。当第二条记录到达时,执行联接并清除状态。如果第二条记录没有到达,则在计时器触发时将调用
onTimer()
方法。在这一点上,您可以只清除状态并返回(INNER JOIN语义(,也可以转发用null
值填充的第一条记录(OUTER JOIN语义。计时器充当一个安全网,能够在某个时刻消除状态。这取决于您的要求,您希望等待第二张唱片到达的时间。 -
表API或SQL提供了一个时间窗口连接(表API,SQL(,其工作原理与我在1中描述的类似。不同之处在于,窗口连接实现将尝试连接在连接间隔期间到达的所有记录(即,每个输入流中的多个记录(,因此将使状态保持更长时间。但是,一旦时间超过联接间隔,就会清除状态。
-
Flink 1.6.0(将于2018年8月初发布(将包括DataStream API的间隔联接,其工作原理类似于Table API的窗口联接(类似逻辑,不同名称(。它还将使状态保持比自定义实现更长的时间,自定义实现基于每个键在每一侧只出现一次的假设。
我会选择方法1。因为它的内存效率更高,而且仍然相当容易实现。