我想在 Flink 中创建一个基于 EventTime 的会话窗口,这样当新消息的事件时间比创建窗口的消息的事件时间大 180 秒以上时,它就会触发。
例如:
t1(0 seconds) : msg1 <-- This is the first message which causes the session-windows to be created
t2(13 seconds) : msg2
t3(39 seconds) : msg3
.
.
.
.
t7(190 seconds) : msg7 <-- The event time (t7) is more than 180 seconds than t1 (t7 - t1 = 190), so the window should be triggered and processed now.
t8(193 seconds) : msg8 <-- This message, and all subsequent messages have to be ignored as this window was processed at t7
我想创建一个触发器,以便通过适当的水印或 onEventTime 触发器实现上述行为。任何人都可以提供一些例子来实现这一点吗?
解决此问题的最佳方法可能是使用 ProcessFunction,而不是自定义窗口。如果如示例中所示,事件将按时间戳顺序处理,那么这将非常简单。另一方面,如果您必须处理无序事件(这在处理事件时间数据时很常见),则会稍微复杂一些。(想象一下,时间为 187 的 msg6 在 t8 之后到达。如果这是可能的,并且如果这会影响你想要产生的结果,那么这必须被处理。
如果事件按顺序排列,则逻辑大致如下所示:
使用升序时间戳提取器作为水印的基础。
使用 Flink 状态(可能是 ListState)来存储窗口内容。当事件到达时,将其添加到窗口中,并检查自第一个事件以来是否已超过 180 秒。如果是这样,请处理窗口内容并清除列表。
如果事件可能无序,请使用 BoundedOutOfOrdernessTimestampExtractor,并且在 currentWatermark 指示事件时间已超过窗口开始时间 180 秒之前不要处理窗口的内容(为此可以使用事件时间计时器)。触发窗口时不要完全清除列表,而只需删除属于正在关闭的窗口的元素。