如何在通过reduce函数与当前状态合并之前聚合flink流中的事件



我的事件如下:case class Event(user: User, stats: Map[StatType, Int])

每个事件中都包含+1或-1个值。我有我目前的管道,它运行良好,但每次更改统计数据都会产生新的事件。

eventsStream
.keyBy(extractKey)
.reduce(reduceFunc)
.map(prepareRequest)
.addSink(sink)

在将这些增量与当前状态合并之前,我希望在一个时间窗口中聚合这些增量。所以我想要同样的滚动减少,但有一个时间窗口。

当前简单滚动减少:

500 – last reduced value
+1
-1
+1
Emitted events: 501, 500, 501 

滚动减少与窗口:

500 – last reduced value
v-- window
+1
-1
+1
^-- window
Emitted events: 501

我曾经尝试过将时间窗口放在reduce之前的天真解决方案,但在阅读了文档后,我发现reduce现在有不同的行为。

eventsStream
.keyBy(extractKey)
.timeWindow(Time.minutes(2))
.reduce(reduceFunc)
.map(prepareRequest)
.addSink(sink)

似乎我应该制作键控流,并在减少我的时间窗口后减少它:

eventsStream
.keyBy(extractKey)
.timeWindow(Time.minutes(2))
.reduce(reduceFunc)
.keyBy(extractKey)
.reduce(reduceFunc)
.map(prepareRequest)
.addSink(sink)

这是解决问题的正确途径吗?

可能有不同的选项,但其中之一是实现WindowFunction,然后在窗口化后运行apply

eventsStream
.keyBy(extractKey)
.timeWindow(Time.minutes(2))
.apply(new MyWindowFunction)

(WindowFuntion采用输入值类型、输出值类型和键类型的类型参数。(

这里有一个例子。让我复制相关片段:

/** User-defined WindowFunction to compute the average temperature of SensorReadings */
class TemperatureAverager extends WindowFunction[SensorReading, SensorReading, String, TimeWindow] {
/** apply() is invoked once for each window */
override def apply(
sensorId: String,
window: TimeWindow,
vals: Iterable[SensorReading],
out: Collector[SensorReading]): Unit = {
// compute the average temperature
val (cnt, sum) = vals.foldLeft((0, 0.0))((c, r) => (c._1 + 1, c._2 + r.temperature))
val avgTemp = sum / cnt
// emit a SensorReading with the average temperature
out.collect(SensorReading(sensorId, window.getEnd, avgTemp))
}

我不知道你的数据看起来怎么样,所以我不能尝试一个完整的答案,但这应该是一个灵感。

是的,您提出的管道将达到预期效果。该窗口将减少2分钟的批次。这些批次的结果将流入最终的reduce,该reduce将在其每个输入(即窗口结果(后产生更新的结果。

最新更新