Flink广播状态在进程函数内部实现会话窗口



我的flink应用程序旨在处理来自传感器的物联网数据。传感器通过网关发送数据。这就是示例数据的样子case class Data(sensorId: String, value: Float, gatewayId: String, timestamp: Long)来自同一传感器的数据可以来自不同的网关

如果网关与网络断开连接,那么我会收到关于此case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)的特殊事件,并使用与传感器的主数据流连接的广播流

传感器可能在两种情况下不发送数据,

  • 它坏了
  • 网关与网络断开连接(将接收广播流中的GatewayEvents("gwId","disconnected",1617979694)消息(

如果我收到某个网关与网络断开连接的消息,并且通过它发送数据的传感器停止发送数据(例如,在1分钟内(,我需要创建一个特殊事件

我的半实现实现如下:

case class Data(sensorId: String, value: Float, gatewayId: String)
case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
val sensorData: DataStream[Data] ...
val gwData: DataStream[GatewayEvents] ...
val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, GatewayEvents]("gatewayEvents", classOf[String], classOf[GatewayEvents])
val broadcastGatewayEventsStream = gwData.broadcast(gatewayBroadcastStateDescriptor)
val events: sensorData.
.keyBy(_.sensorId)
.connect(broadcastGatewayEventsStream)
.process(...)

无法实现此过程。有什么想法吗?我认为SessionWindows会帮助我,但我不知道如何最好地做到

因此,我认为在这种情况下,最简单的想法是使用计时器。因此,基本上,您可以实现KeyedCoProcess功能,如果它收到GatewayDisconnected消息,您将注册定时器(处理时间(,以便在所需时间后启动。如果传感器收到任何消息,您只需删除注册的计时器,这样它就不会启动。在onTimer函数内部,您可以简单地发出所需的事件,因为如果计时器触发,则意味着时间跨度中没有值。

这里需要注意的一点是,如果您keyBy(_.sensorId),则意味着将为通过该网关接收到的每个传感器生成事件。如果只想为gatewa发出一个事件,只需将分区更改为keyBy(_.gatewayId)即可。

相关内容

  • 没有找到相关文章

最新更新