如何在 RichCoMapFunction [ Apache Flink ] 中获取全局声明的 MapState 值



我正在实现 Flink 数据流以进行一些实时数据计算。这样我就可以从两种类型的源中获取数据流值。我需要根据一些键进行一些转换。当我使用RichCoMapFunction时,Mapstate对全局不可见。我的程序如下

 class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {
private var sourceMap1: MapState[String, Map[String, String]] = _
private var sourceMap2: MapState[String, Map[String, String]] = _
override def map1(in1: (String, Map[String, String])): Map[String, String] = {
  sourceMap1.put(in1._2("key"), in1._2)     
  println(sourceMap1.keys())  // Working with updated values
  println(sourceMap2.keys())  // Return empty value always
  return in1._2
}
override def map2(in2: (String, Map[String, String])): Map[String, String] = {
  sourceMap2.put(in2._2("key"), in2._2)
  println(sourceMap1.keys()) // Return empty value always
  println(sourceMap2.keys()) // Working with updated values
  return in2._2
}
override def open(parameters: Configuration): Unit = {
  val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
  sourceMap1 = getRuntimeContext.getMapState(desc1)
  val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
  sourceMap2 = getRuntimeContext.getMapState(desc2)
}
 }

我需要在 map1 函数中访问 sourceMap2,因为它被声明为全局。但是当我尝试在 map2 函数中打印 sourceMap1 的键时,它总是返回为空值。但是,如果我在 map1 函数中打印 sourceMap1,则意味着它将打印所有添加的键。

当使用键控状态时,Flink 会为每个键值存储一个单独的状态值。这意味着,如果你有一个有状态映射器m状态s并且你处理记录(x1, y1)(x2, y2)其中x是键,Flink 将在其状态后端存储s(x1) = (x1, v1)s(x2) = (x2, v2)

处理 (x2, y2) 时,您只能访问s(x2),无法访问s(x1)

我认为这就是您看到大概空MapState的原因.map1map2的传入记录将具有不同的keys,因此,您可以map1访问未存储任何键值对的键(不是映射键,而是keyBy键)的sourceMap2。这同样适用于在尚未存储键值对的键下访问sourceMap1 map2

转换器类将应用于两个连接的键控流。 sourceMap1 和 sourceMap2 是键控状态,这意味着对于两个连接的流的每个键,您都有一个单独的嵌套哈希映射。每次调用 map1 或 map2 时,这些映射中的一对都在范围内,即对应于被映射项目的键的对。

相反,如果您希望在所有键之间共享全局状态,请查看广播状态模式。

相关内容

  • 没有找到相关文章

最新更新