我有一些设备状态更改的流,例如:case class DeviceState(ts: Long, state: Int)
.设备仅在更改时发送其状态。因此,例如,它可以是这样的:
ts | state
----------
0 | ONLINE
3 | OFFLINE
11 | ONLINE
19 | OFFLINE
(在实际代码中ts
是 unix 时间毫秒,我简化了它以达到示例目的(我想通过翻转 10 个刻度的窗口来分区此流并计算每个状态的总持续时间,因此例如,如果标点符号是在刻度 45 处发出的,则结果应如下所示:
window | state | duration
-----------------------------
0 - 10 | ONLINE | 3
0 - 10 | OFFLINE | 7
10 - 20 | OFFLINE | 2
10 - 20 | ONLINE | 8
20 - 30 | OFFLINE | 10
30 - 40 | OFFLINE | 10
是否可以在 Flink 中进行这样的持续时间计算?我认为它可以通过自定义reduce函数来实现,但是我不知道如何发出最后一个状态,因此它将出现在每个窗口中(在上面的示例中,最后一个状态是在tick 19,但它仍然应该在窗口20-30,30-40等中使用(。
使用 Flink 的窗口 API,在将事件分配给它之前,窗口不存在,这使得你尝试做的事情变得更加困难。
一种解决方案可能是使用带有计时器的 ProcessFunction 将第三种类型的事件混合到流中,该事件仅用于触发原本为空的窗口。
另一种解决方案是使用ProcessFunction(带有一些状态和计时器(而不是窗口来完成计算分析的所有工作。