当用EventTimeSessionWindows
在Flink中实现AggregateFunction
时,我无法理解在具有动态间隙的SessionWindow的情况下何时发生合并。
代码片段:
SingleOutputStreamOperator<Tuple1<String>> aggregateData = parsedData.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(20)))
.keyBy(new ZeusRawKeyByFunction())
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ZeusEvent>() {
@Override
public long extract(ZeusEvent event) {
if (event.getEventTypeName().equals("PlaybackSessionClosed")) {
return 100;
} else {
return Time.minutes(30).toMilliseconds();
}
}
}))
.allowedLateness(Time.minutes(10))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.sideOutputLateData(lateEvents)
.aggregate(new ZeusAggregateFunction())
.setParallelism(parameterTool.getInt("zeus-aggregator-parallelism"))
.name("Zeus Aggregator")
我在聚合器中定义了四个功能:
createAccumulator
:这将创建一个新的累加器add
:这将继续将1分钟触发时间内的所有新事件添加到累加器getResult
:这将使最后一行写入触发的接收器merge
:什么时候可以?每个触发器都会发生合并吗?
我正在努力了解触发器是否每分钟都会发生合并,并且会创建一个新的累加器并与前一个累加器合并。
考虑一个时间戳为e.t的事件e和一个计算为间隙(e(的动态间隙。
当每个事件e到达窗口操作符时,它最初被分配给从e.t
扩展到e.t + gap(e)
的新会话窗口。然后,窗口操作符在所有会话上迭代(对每个键独立(,每当两个会话(在时间上(重叠时,它们就会合并,形成一个新的、更长的会话,覆盖两个会话的时间跨度的并集。这种情况一直持续到无法进一步合并为止。
每次合并发生时,都会调用触发器的onMerge
方法,以及累加器的merge
方法。
然后调用触发器的onElement
方法,传入事件e。这将确保适当的定时器就位,以便当水印经过会话窗口的末尾(包括适当的间隙(时,窗口将FIRE。
因此,合并是在处理每个事件时完成的,并且不与连续/周期性触发的定时耦合。