如何在Flink中使用单个窗口进行多个聚合?



我是Flink的新手,我想做一些我在Spark中做过很多次的事情。

例如,在Spark中,我可以在

下面这样做
ds.groupByKey(???).mapGroups(???) // aggregate 1
.groupByKey(???).mapGroups(???) // aggregate 2

第一个聚合处理一批输入数据,第二个聚合处理第一个聚合的输出。我需要的是第二次聚合的产出

但是在Flink中,似乎任何聚合都应该与下面的特定窗口一起执行

ds.keyBy(???)
.window(???) // window 1
.aggregate(???) // aggregate 1
.keyBy(???)
.window(???) // window 2
.aggregate(???) // aggregate 2

如果我设置了窗口2,那么第二个聚合的输入数据可能不是第一个聚合的输出,这将违背我的愿望。

我想对同一批数据进行多个连续聚合,这些数据可以在单个窗口中收集。如何在Flink中实现?

谢谢你的帮助。


更新获取更多详细信息。

窗口必须有自己的策略,例如我可以像下面这样设置窗口策略

ds.keyBy(key1)
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.HOURS))) // window 1, 1 hour tumbling window
.aggregate(???) // aggregate 1
.keyBy(key2)
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.MINUTES))) // window 2, 1 minute tumbling window
.aggregate(???) // aggregate 2

窗口1可以在一个小时的滚动时间窗口中收集10亿行,聚合后输出100万行。

我想对聚合2中的100万行做一些计算,但是我不知道哪种窗口策略可以恰好收集这100万行。

如果我像上面那样设置带有滚动时间窗口的窗口2,它可能会将这一百万行分成两个批,并且聚合2的输出将不是我需要的。

可以通过使用事件时间窗口而不是处理时间窗口来避免这个问题。如果事件中还没有时间戳作为计时的基础,那么可以这样使用ingestion-time时间戳


WatermarkStrategy<MyType> watermarkStrategy =
WatermarkStrategy
.<MyType>forMonotonousTimestamps()
.withTimestampAssigner(
(event, streamRecordTimestamp) -> Instant.now());
DataStream<MyType> timestampedEvents = ds
.assignTimestampsAndWatermarks(watermarkStrategy);
timestampedEvents.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MINUTES)))
.aggregate(...)
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.HOURS)))
.aggregate(...)

这是有效的,因为第一个窗口产生的每个事件都将使用分配给它们的窗口结束的时间戳进行时间戳。这要求第二个窗口的持续时间与第一个窗口的持续时间相同,或者是第一个窗口持续时间的倍数。

同样,任意更改窗口2使用的键分区(与窗口1相比)可能会产生无意义的结果。

最新更新