如何在Flink流处理窗口中收集后期数据



考虑我有一个包含事件时间数据的数据流。我想在8毫秒的窗口时间内收集输入数据流,并减少每个窗口的数据。我使用以下代码:

aggregatedTuple
.keyBy( 0).timeWindow(Time.milliseconds(8))
.reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()

数据流的关键是处理时间的时间戳映射到处理毫秒的时间戳的最后8个倍数,例如1531569851297将映射到1531569851296

但数据流可能到达较晚,并进入错误的窗口时间。例如,假设我将窗口时间设置为8毫秒。如果数据按顺序进入Flink引擎,或者至少延迟小于窗口时间(8毫秒(,这将是最好的情况。但假设数据流事件时间(也是数据流中的一个字段(以30毫秒的延迟到达。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以在这么晚的时候过滤数据。所以我有两个问题:

  • 如何在数据流想要进入窗口时对其进行过滤,并检查数据是否在窗口的正确时间戳创建
  • 如何在变量中收集如此晚的数据以对其进行处理

Flink有两个不同的相关抽象,它们处理在具有事件时间戳的流上计算窗口分析的不同方面:水印允许延迟

首先,水印,无论何时处理事件时间数据(无论是否使用windows(,它都会发挥作用。水印为Flink提供了有关事件时间进度的信息,并为应用程序编写者提供了一种处理无序数据的方法。水印随数据流流动,每个水印都标记流中的一个位置并携带时间戳。水印的作用是断言,在流中的那个点上,流现在(可能(已经完成到那个时间戳——或者换句话说,水印之后的事件不太可能来自水印指示的时间之前。最常见的水印策略是使用BoundedOutOfOrdernessTimestampExtractor,它假设事件在某个固定的、有界的延迟内到达。

这现在提供了延迟的定义——时间戳小于水印时间戳的水印之后的事件被视为延迟

窗口API提供了允许延迟的概念,默认情况下将其设置为零。如果允许的延迟大于零,则事件时间窗口的默认触发器将在其相应的窗口中接受延迟事件,最高不超过允许的延迟限制。窗口操作将在正常时间启动一次,然后针对每个延迟事件再次启动,直到允许的延迟间隔结束。之后,将丢弃后期事件(如果已配置,则将其收集到侧输出(。

How can I filter data stream as it wants to enter the window and check 
if the data created at the right timestamp for the window?

Flink的窗口分配器负责将事件分配给适当的窗口——正确的事情会自动发生。将根据需要创建新的窗口实例。

How can I gather such late data in a variable to do some processing on them?

您可以在水印方面足够慷慨,以避免出现任何延迟数据,和/或将允许的延迟配置为足够长,以适应延迟事件。但是,请注意,Flink将被迫打开所有仍在接受后期事件的窗口,这将延迟垃圾收集旧窗口,并可能消耗大量内存。

请注意,本讨论假设您希望使用时间窗口,例如您正在使用的8秒长的窗口。Flink还支持计数窗口(例如,将事件分组为100个批次(、会话窗口和自定义窗口逻辑。例如,如果使用计数窗口,水印和延迟就不会起任何作用。

如果您想要分析的每个键的结果,那么在应用窗口之前,请使用keyBy按键(例如,按userId(对流进行分区。例如

stream
.keyBy(e -> e.userId)
.timeWindow(Time.seconds(10))
.reduce(...)

将为每个userId生成单独的结果。

更新:请注意,在最近版本的Flink中,windows现在可以将后期事件收集到侧输出中。

一些相关文件:

事件时间和水印
允许的延迟

相关内容

  • 没有找到相关文章

最新更新