Apache Flink MapState vs Value[Map[String, String]] usage



我按键划分流,并为每个键管理一个映射状态,如;

stream
.keyBy(_.userId)
.process(new MyStateFunc)

每次,我都要读取一个键下的所有值,计算一些值,然后只更新其中的一小部分。一个例子;

class MyStateFunc() .. {

val state = ValueState[Map[String, String]]
def process(event: MyModel...): {
val stateAsMap = state.value()
val updatedStateValues = updateAFewColumnsOfStateValByUsingIncomingEvent(event, stateAsMap)
doCalculationByUsingSomeValuesOfState(updatedStateValues)
state.update(updatedStateValues)
}
def updateAFewColumnsOfStateValByUsingIncomingEvent(event, state): Map[String, String] = {
val updateState = Map.empty
event.foreach {case (status, newValue) => 
updateState.put(status, newValue)
}
state ++ updatedState
}
def doCalculationByUsingSomeValuesOfState(stateValues): Map[String, String] = {
// do some staff by using some key and values
}
}

我不确定这是最有效的方法。是的,我必须读取所有的值(至少是其中的一些(才能进行计算,但我也需要更新其中的一些值,而不是存储在每个键中的所有Map。我只是想知道哪一个更有效;Value[Map[String, String]]MapState[String, String]

如果我使用MapState[String, String],我必须执行以下操作才能更新相关密钥;

val state = MapState[String, String]
def process(event: MyModel...): {
val stateAsMap = state.entries().asScala
event.foreach { case (status, newValue)
state.put(status, newValue)
}
}

我不确定尝试更新每个事件类型的状态是否有效。

mapState.putAll(changeEvents)

这只会覆盖相关密钥而不是所有密钥吗?

或者可以是另一种克服的方式?

如果您的州只有几个条目,那么它可能无关紧要。如果你的映射可以有大量的条目,那么使用MapState(带有RocksDB状态后端(应该可以显著降低序列化成本,因为你只更新了几个条目而不是整个状态。

请注意,为了提高效率,您应该在MapState上迭代一次,进行计算并(偶尔(更新条目,假设这是可能的。

最新更新