如果传感器读数自上次事件以来没有变化,则如何计算窗口上的聚合?



当仅当传感器值自上次事件以来发生更改时才发送新事件时,如何从传感器计算窗口上的聚合? 传感器读数在固定时间获取,例如每 5 秒一次,但只有在读数自上次读数以来发生变化时才转发。

因此,如果我想为每个设备创建平均signal_stength:

eventsDF = ... 
avgSignalDF = eventsDF.groupBy("deviceId").avg("signal_strength")

例如,设备在一分钟内发送的事件:

event_time  device_id  signal_strength
12:00:00    1          5
12:00:05    1          4
12:00:30    1          5
12:00:45    1          6
12:00:55    1          5

填充了未实际发送事件的同一数据集:

event_time  device_id  signal_strength
12:00:00    1          5
12:00:05    1          4
12:00:10    1          4
12:00:15    1          4
12:00:20    1          4
12:00:25    1          4
12:00:30    1          5
12:00:35    1          5
12:00:40    1          5
12:00:45    1          6
12:00:50    1          6
12:00:55    1          5

signal_strengthsum57avg57/12

如何通过火花结构化流和从推断值计算的平均值来推断这些缺失的数据?

注意:我使用平均值作为聚合的示例,但该解决方案需要适用于任何聚合函数。

已编辑:

我修改了逻辑,仅从过滤dataframe计算平均值,以便解决差距。

//input structure
case class StreamInput(event_time: Long, device_id: Int, signal_strength: Int)
//columns for which we want to maintain state
case class StreamState(prevSum: Int, prevRowCount: Int, prevTime: Long, prevSignalStrength: Int, currentTime: Long, totalRow: Int, totalSum: Int, avg: Double)
//final result structure
case class StreamResult(event_time: Long, device_id: Int, signal_strength: Int, avg: Double)
val filteredDF = ???  //get input(filtered rows only)
val interval = 5  // event_time interval
// using .mapGroupsWithState to maintain state for runningSum & total row count till now
// you need to set the timeout threshold to indicate how long you wish to maintain the state
val avgDF = filteredDF.groupByKey(_.device_id)
.mapGroupsWithState[StreamState, StreamResult](GroupStateTimeout.NoTimeout()) {
case (id: Int, eventIter: Iterator[StreamInput], state: GroupState[StreamState]) => {
val events = eventIter.toSeq
val updatedSession = if (state.exists) {
//if state exists update the state with the new values
val existingState = state.get
val prevTime = existingState.currentTime
val currentTime = events.map(x => x.event_time).last
val currentRowCount = (currentTime - prevTime)/interval
val rowCount = existingState.rowCount + currentRowCount.toInt
val currentSignalStength = events.map(x => x.signal_strength).last
val total_signal_strength = currentSignalStength + 
(existingState.prevSignalStrength * (currentRowCount -1)) + 
existingState.total_signal_strength
StreamState(
existingState.total_signal_strength,
existingState.rowCount,
prevTime,
currentSignalStength,
currentTime,
rowCount,
total_signal_strength.toInt,
total_signal_strength/rowCount.toDouble
)
} else {
// if there are no earlier state
val runningSum = events.map(x => x.signal_strength).sum
val size = events.size.toDouble
val currentTime = events.map(x => x.event_time).last
StreamState(0, 1, 0, runningSum, currentTime, 1, runningSum, runningSum/size)
}
//save the updated state
state.update(updatedSession)
StreamResult(
events.map(x => x.event_time).last,
id,
events.map(x => x.signal_strength).last,
updatedSession.avg
)
}
}
val result = avgDF
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start

这个想法是计算两个新列:

  1. totalRowCount:如果您尚未过滤,则应该存在的行数的运行总计。
  2. total_signal_strength:到目前为止signal_strength的运行总数。(这也包括遗漏的行总数(。

其计算公式为:

total_signal_strength = 
current row's signal_strength  +  
(total_signal_strength of previous row * (rowCount -1)) + 
//rowCount is the count of missed rows computed by comparing previous and current event_time.
previous total_signal_strength

中间状态的格式:

+----------+---------+---------------+---------------------+--------+
|event_time|device_id|signal_strength|total_signal_strength|rowCount|
+----------+---------+---------------+---------------------+--------+
|         0|        1|              5|                    5|       1|
|         5|        1|              4|                    9|       2|
|        30|        1|              5|                   30|       7|
|        45|        1|              6|                   46|      10|
|        55|        1|              5|                   57|      12|
+----------+---------+---------------+---------------------+--------+

最终输出:

+----------+---------+---------------+-----------------+
|event_time|device_id|signal_strength|              avg|
+----------+---------+---------------+-----------------+
|         0|        1|              5|              5.0|
|         5|        1|              4|              4.5|
|        30|        1|              5|4.285714285714286|
|        45|        1|              6|              4.6|
|        55|        1|              5|             4.75|
+----------+---------+---------------+-----------------+

数学上等效于基于持续时间的加权平均问题:

avg=(signal_strength*duration)/60

这里的挑战是获取每个信号的持续时间,这里的一个选项是针对每个微批次,在驱动程序中收集结果,然后都是统计问题,要获得持续时间,您可以在开始时间进行左移然后减去,如下所示:

window.start.leftShift(1)-window.start

这将为您提供:

event_time  device_id  signal_strength duration
12:00:00    1          5                  5(5-0)
12:00:05    1          4                  25(30-5)
12:00:30    1          5                  15(45-30)
12:00:45    1          6                  10(55-45)
12:00:55    1          5                  5 (60-55)
(5*5+4*25+5*15+6*10+5*5)/60=57/12

从 Spark 结构化流式处理 2.3.2 开始,您需要编写自己的自定义接收器,以收集驱动程序的每个阶段的结果,并像这样执行数学工作。

最新更新