我的flink应用程序旨在处理来自传感器的物联网数据。传感器通过网关发送数据。这就是示例数据的样子case class Data(sensorId: String, value: Float, gatewayId: String, timestamp: Long)
来自同一传感器的数据可以来自不同的网关
如果网关与网络断开连接,那么我会收到关于此case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
的特殊事件,并使用与传感器的主数据流连接的广播流
传感器可能在两种情况下不发送数据,
- 它坏了
- 网关与网络断开连接(将接收广播流中的
GatewayEvents("gwId","disconnected",1617979694)
消息(
如果我收到某个网关与网络断开连接的消息,并且通过它发送数据的传感器停止发送数据(例如,在1分钟内(,我需要创建一个特殊事件
我的半实现实现如下:
case class Data(sensorId: String, value: Float, gatewayId: String)
case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
val sensorData: DataStream[Data] ...
val gwData: DataStream[GatewayEvents] ...
val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, GatewayEvents]("gatewayEvents", classOf[String], classOf[GatewayEvents])
val broadcastGatewayEventsStream = gwData.broadcast(gatewayBroadcastStateDescriptor)
val events: sensorData.
.keyBy(_.sensorId)
.connect(broadcastGatewayEventsStream)
.process(...)
无法实现此过程。有什么想法吗?我认为SessionWindows会帮助我,但我不知道如何最好地做到
因此,我认为在这种情况下,最简单的想法是使用计时器。因此,基本上,您可以实现KeyedCoProcess
功能,如果它收到GatewayDisconnected
消息,您将注册定时器(处理时间(,以便在所需时间后启动。如果传感器收到任何消息,您只需删除注册的计时器,这样它就不会启动。在onTimer
函数内部,您可以简单地发出所需的事件,因为如果计时器触发,则意味着时间跨度中没有值。
这里需要注意的一点是,如果您keyBy(_.sensorId)
,则意味着将为通过该网关接收到的每个传感器生成事件。如果只想为gatewa发出一个事件,只需将分区更改为keyBy(_.gatewayId)
即可。