我想在Flink中的WindowedStream上执行一些操作,比如平均。但是预定义的可用操作非常有限,例如总和、最小值、最大值等。
val windowedStream = valueStream
.keyBy(0)
.timeWindow(Time.minutes(5))
.sum(2) //Change this to average?
假设我想找到平均值,我该怎么做?
Flink 没有内置函数来计算WindowStream
的平均值。您必须为此实现自定义WindowFunction
。
最有效的方法是实现一个计算要取平均值的值的计数和总和的ReduceFunction
,以及一个获取ReduceFunction
结果并计算平均值的后续WindowFunction
。使用 ReduceFunction
更有效,因为 Flink 直接将其应用于传入值。因此,它会动态聚合值,而不会在窗口中收集它们。这大大减少了窗口的内存占用。
由于ReduceFunction
的输出与其输入的类型相同,因此您需要在应用ReduceFunction
之前为计数添加一个字段。
像下面这样的东西应该可以解决问题:
val valueStream: DataStream[(String, Double)] = ???
val r: DataStream[(String, Double)] = valueStream
// append a 1L for counting
.map(x => (x._1, x._2, 1l))
// key and window stream
.keyBy(0).timeWindow(Time.minutes(5))
.apply(
// ReduceFunction (compute sum and count)
(x: (String, Double, Long), y: (String, Double, Long)) =>
(x._1, x._2 + y._2, x._3 + y._3),
// WindowFunction
(key, window: TimeWindow, input: Iterable[(String, Double, Long)], out: Collector[(String, Double)]) => {
// get first (and only) value
val x: (String, Double, Long) = input.toIterator.next
// compute average as sum / count
out.collect(x._1, x._2 / x._3)
}
)