我使用MapStateDescriptor
进行有状态计算。这里有一些代码
final val myMap = new MapStateDescriptor[String, List[String]]("myMap", classOf[String], classOf[List[String]])
在我的计算过程中,我想通过向List[String]
添加新元素来更新我的地图。
有可能吗?
更新#1
已经写了以下def来管理我的地图
def updateTagsMapState(mapKey: String, tagId: String, mapToUpdate: MapState[String, List[String]]): Unit = {
if (mapToUpdate.contains(mapKey)) {
val mapValues: List[String] = mapToUpdate.get(mapKey)
val updatedMapValues: List[String] = tagId :: mapValues
mapToUpdate.put(mapKey, updatedMapValues)
} else {
mapToUpdate.put(mapKey,List(tagId))
}
}
当然是。根据这是ScalaList
还是Java,你可以这样做,从描述符创建状态:
lazy val stateMap = getRuntimeContext.getMapState(myMap)
然后你可以简单地做:
val list = stateMap.get("someKey")
stateMap.put("someKey", list +: "SomeVal")
请注意,如果使用可变数据结构,则不一定需要再次调用put
,因为数据结构的更新也会更新状态。但是,这种方法在RocksDB状态的情况下不起作用,因为在这种情况下,只有在调用put
之后才会更新状态,所以总是建议更新状态本身,而不仅仅是底层对象。