Apache Flink广播状态被冲洗了



我正在使用广播模式连接两个流并从一个流读取数据。代码看起来像这样

case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
  override def processBroadcastElement(in2: (String, Double), 
                                       context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
                                       collector:Collector[MyObject]):Unit={
    context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
  }
  override def processElement(obj: MyObject,
                            readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double), 
                            MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
    val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
    //If I print the context of the state here sometimes it is empty.
    out.collect(MyObject(new, properties, go, here))
  }
}

状态描述符:

val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])

我的执行代码看起来像这样。

val streamA :DataStream[MyObject] = ... 
val streamB :DataStream[(String,Double)] = ... 
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)
streamA.connect(streamB).process(new Broadcast)

问题在processElement功能中,状态有时为空,有时不是。该状态应始终包含数据,因为我不断地从我知道它具有数据的文件中流式传输。我不明白为什么要冲洗状态,而我无法获取数据。

我尝试在将数据放到状态之前和之后在processBroadcastElement中添加一些打印,结果是以下

0 - 1
1 - 2 
2 - 3 
.. all the way to 48 where it resets back to 0

更新:我注意到的是,当我降低流动执行上下文的超时值时,结果会好一些。当我增加它时,地图总是空的。

env.setBufferTimeout(1) //better results 
env.setBufferTimeout(200) //worse result (default is 100)

每当两个流中连接到弗林克时,您就无法控制弗林克将事件从两个流传递到您的用户功能的时机。因此,例如,如果有一个可以从StreamA处理的事件,并且可以从StreamB处理的事件,则接下来可以处理任何一个。您不能指望广播中的以某种方式优先于另一个流。

根据您的要求,您可能会采用各种策略来应对这两个流之间的比赛。例如,您可以使用KeyedBroadcast ProcessFunction并使用其ApplakeYedState方法在所有现有的键盘状态时迭代。

正如大卫提到的那样,这项工作可能正在重新启动。我禁用了检查点,因此我可以看到任何可能的例外,而不是静静地失败和重新启动作业。

事实证明,试图解析文件时存在错误。因此,工作一直在重新启动,因此状态是空的,flink不断地一遍又一遍地消耗流。

相关内容

  • 没有找到相关文章

最新更新