带SessionWindow的AggregateFunction-了解合并的工作原理



当用EventTimeSessionWindows在Flink中实现AggregateFunction时,我无法理解在具有动态间隙的SessionWindow的情况下何时发生合并。

代码片段:

SingleOutputStreamOperator<Tuple1<String>> aggregateData = parsedData.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(20)))
.keyBy(new ZeusRawKeyByFunction())
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ZeusEvent>() {
@Override
public long extract(ZeusEvent event) {
if (event.getEventTypeName().equals("PlaybackSessionClosed")) {
return 100;
} else {
return Time.minutes(30).toMilliseconds();
}
}
}))
.allowedLateness(Time.minutes(10))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.sideOutputLateData(lateEvents)
.aggregate(new ZeusAggregateFunction())
.setParallelism(parameterTool.getInt("zeus-aggregator-parallelism"))
.name("Zeus Aggregator")

我在聚合器中定义了四个功能:

  • createAccumulator:这将创建一个新的累加器

  • add:这将继续将1分钟触发时间内的所有新事件添加到累加器

  • getResult:这将使最后一行写入触发的接收器

  • merge:什么时候可以?每个触发器都会发生合并吗?

我正在努力了解触发器是否每分钟都会发生合并,并且会创建一个新的累加器并与前一个累加器合并。

考虑一个时间戳为e.t的事件e和一个计算为间隙(e(的动态间隙

当每个事件e到达窗口操作符时,它最初被分配给从e.t扩展到e.t + gap(e)的新会话窗口。然后,窗口操作符在所有会话上迭代(对每个键独立(,每当两个会话(在时间上(重叠时,它们就会合并,形成一个新的、更长的会话,覆盖两个会话的时间跨度的并集。这种情况一直持续到无法进一步合并为止。

每次合并发生时,都会调用触发器的onMerge方法,以及累加器的merge方法。

然后调用触发器的onElement方法,传入事件e。这将确保适当的定时器就位,以便当水印经过会话窗口的末尾(包括适当的间隙(时,窗口将FIRE。

因此,合并是在处理每个事件时完成的,并且不与连续/周期性触发的定时耦合。

相关内容

  • 没有找到相关文章

最新更新