Flink MapStateDescriptor更新列表值



我使用MapStateDescriptor进行有状态计算。这里有一些代码

final val myMap = new MapStateDescriptor[String, List[String]]("myMap", classOf[String], classOf[List[String]])

在我的计算过程中,我想通过向List[String]添加新元素来更新我的地图。

有可能吗?

更新#1

已经写了以下def来管理我的地图

def updateTagsMapState(mapKey: String, tagId: String, mapToUpdate: MapState[String, List[String]]): Unit = {
if (mapToUpdate.contains(mapKey)) {
val mapValues: List[String] = mapToUpdate.get(mapKey)
val updatedMapValues: List[String] = tagId :: mapValues
mapToUpdate.put(mapKey, updatedMapValues)
} else {
mapToUpdate.put(mapKey,List(tagId))
}
}

当然是。根据这是ScalaList还是Java,你可以这样做,从描述符创建状态:

lazy val stateMap = getRuntimeContext.getMapState(myMap)

然后你可以简单地做:

val list = stateMap.get("someKey")
stateMap.put("someKey", list +: "SomeVal")

请注意,如果使用可变数据结构,则不一定需要再次调用put,因为数据结构的更新也会更新状态。但是,这种方法在RocksDB状态的情况下不起作用,因为在这种情况下,只有在调用put之后才会更新状态,所以总是建议更新状态本身,而不仅仅是底层对象。

相关内容

  • 没有找到相关文章

最新更新