即使Flink具有一些内置工具来处理较晚的数据,例如允许延迟,我还是想自己处理迟到的数据。例如,我想监视迟到事件,或将它们保存到数据库中。
我该怎么做?
通常在窗户操作员中使用迟到和水印。而且,如果您使用窗口操作员,则可以使用这样的侧面图:
val windowStream = eventStream.keyBy(output => output.rule)
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
.sideOutputLateData(lateOutputTag)
并从这样的后输出获取较晚元素:
windowStream.getSideOutput(lateOutputTag).print()
ProcessFunctions(ProcessFunction
,KeyedProcessFunction
等)通过Context
对象提供对记录的事件时间戳和TimerService
的访问权限。TimerService
可访问当前水印。
您可以通过比较事件时间戳和水印来识别后期记录。如果时间戳较小或等于水印,则该事件迟到。
这取决于您如何处理晚事。您可以标记它们,可以丢弃它们,通过侧输出发射它们或与它们执行任何类型的计算。