我有一个高度并行化的聚合,其中有很多键,我在多个节点上运行。 然后,我想对所有值进行汇总聚合,类似于下面的代码:
val myStream = sourceStream
.keyBy( 0 )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new OtherSink)
val summaryStream = myStream
.map(Row.fromOtherRow(_))
// parallelism is 1 by definition
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new RowSink)
这工作正常,但我注意到最终执行 windowAll(( 的节点获得了大量的入站网络流量以及该节点 CPU 上的显着峰值。 这显然是因为所有数据都聚合在一起,并行度为"1"。
Flink 中是否有任何当前或计划的规定来做更多的两层汇总聚合,将所有数据保留在每个节点上,在将结果发送到第二层进行最终聚合之前对其进行预聚合? 以下是我希望找到的一些伪代码:
val myStream = sourceStream
.keyBy( 0 )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new OtherSink)
val summaryStream = myStream
.map(Row.fromOtherRow(_))
// parallelism would be at the default for the env
.windowLocal(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
// parallelism is 1 by definition
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new RowSink)
我称它为"windowLocal((",但我相信可能有一个更好的名字。 它将像 windowAll(( 一样是非键控的。 主要好处是,它将减少网络和CPU和内存命中windowAll((具有,方法是将其分布在您正在运行的所有节点上。 我目前必须为我的节点分配更多资源以适应此摘要。
如果这可以通过当前版本以其他方式完成,我很想听听。 我已经考虑过为第二层的密钥使用随机值,但我相信这会导致数据的完全重新平衡,因此它解决了我的 CPU 和内存问题,但不能解决网络问题。 我正在寻找与 rescale(( 相同的内容,其中数据保留在任务管理器或插槽的本地。
使用 FoldFunction 的增量窗口聚合
下面的示例演示如何将增量 FoldFunction 与 WindowFunction 结合使用,以提取窗口中的事件数,并返回窗口的键和结束时间。
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
使用ReduceFunction的增量窗口聚合
下面的示例演示如何将增量 ReduceFunction 与 WindowFunction 结合使用,以返回窗口中的最小事件以及窗口的开始时间。
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
window: TimeWindow,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((window.getStart, min))
}
)
您想了解更多 看这里enter code here
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html