我正在实现 Flink 数据流以进行一些实时数据计算。这样我就可以从两种类型的源中获取数据流值。我需要根据一些键进行一些转换。当我使用RichCoMapFunction时,Mapstate对全局不可见。我的程序如下
class Transformer extends RichCoMapFunction[(String, Map[String, String]), (String, Map[String, String]), Map[String, String]] {
private var sourceMap1: MapState[String, Map[String, String]] = _
private var sourceMap2: MapState[String, Map[String, String]] = _
override def map1(in1: (String, Map[String, String])): Map[String, String] = {
sourceMap1.put(in1._2("key"), in1._2)
println(sourceMap1.keys()) // Working with updated values
println(sourceMap2.keys()) // Return empty value always
return in1._2
}
override def map2(in2: (String, Map[String, String])): Map[String, String] = {
sourceMap2.put(in2._2("key"), in2._2)
println(sourceMap1.keys()) // Return empty value always
println(sourceMap2.keys()) // Working with updated values
return in2._2
}
override def open(parameters: Configuration): Unit = {
val desc1: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap1", classOf[String], classOf[Map[String, String]])
sourceMap1 = getRuntimeContext.getMapState(desc1)
val desc2: MapStateDescriptor[String, Map[String, String]] = new MapStateDescriptor[String, Map[String, String]]("sourceMap2", classOf[String], classOf[Map[String, String]])
sourceMap2 = getRuntimeContext.getMapState(desc2)
}
}
我需要在 map1 函数中访问 sourceMap2,因为它被声明为全局。但是当我尝试在 map2 函数中打印 sourceMap1 的键时,它总是返回为空值。但是,如果我在 map1 函数中打印 sourceMap1,则意味着它将打印所有添加的键。
当使用键控状态时,Flink 会为每个键值存储一个单独的状态值。这意味着,如果你有一个有状态映射器m
状态s
并且你处理记录(x1, y1)
和(x2, y2)
其中x
是键,Flink 将在其状态后端存储s(x1) = (x1, v1)
和s(x2) = (x2, v2)
。
处理 (x2, y2)
时,您只能访问s(x2)
,无法访问s(x1)
。
我认为这就是您看到大概空MapState
的原因.map1
和map2
的传入记录将具有不同的keys
,因此,您可以map1
访问未存储任何键值对的键(不是映射键,而是keyBy
键)的sourceMap2
。这同样适用于在尚未存储键值对的键下访问sourceMap1
map2
。
转换器类将应用于两个连接的键控流。 sourceMap1 和 sourceMap2 是键控状态,这意味着对于两个连接的流的每个键,您都有一个单独的嵌套哈希映射。每次调用 map1 或 map2 时,这些映射中的一对都在范围内,即对应于被映射项目的键的对。
相反,如果您希望在所有键之间共享全局状态,请查看广播状态模式。