我正在研究一个KeyedCoProcessFunction,看起来像这样:
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
with CheckpointedFunction {
// To hold loaded models
@transient private var models: HashMap[(String, String), Model] = _
// For serialization purposes
@transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _
...
override def snapshotState(context: FunctionSnapshotContext): Unit = {
modelsBytes.clear() // This raises an exception when there is no active key set
for ((k, model) <- models) {
modelsBytes.put(k, model.toBytes(v))
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
modelsBytes = context.getKeyedStateStore.getMapState[String, String](
new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
)
if (context.isRestored) {
// restore models from modelsBytes
}
}
}
该状态由使用第三方库构建的 ML 模型集合组成。在检查点之前,我需要将加载的模型转储到snapshotState
中的字节数组中。
我的问题是,在snapshotState
中,当没有活动键时,modelsBytes.clear()
引发异常。当我从头开始启动应用程序而输入流上没有任何数据时,就会发生这种情况。因此,当检查点的时间到来时,我收到此错误:
java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
但是,当输入流包含数据时,检查点可以正常工作。我对此有点困惑,因为snapshotState
没有提供键控上下文(与processElement1
和processElement2
相反,其中当前键可以通过执行ctx.getCurrentKey
来访问(,所以在我看来,对clear
和put
的调用应该始终失败snapshotState
因为它们应该只在键控上下文中工作。谁能澄清这是否是预期的行为?
密钥状态只能用于文档中编写的密钥流。
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
如果调用clear()
,则不会清除整个映射,而只是重置当前密钥的状态。密钥在processElementX
中始终是已知的。
/**
* Removes the value mapped under the current key.
*/
void clear();
当您尝试在processElementX
以外的函数中调用clear
时,您实际上应该会收到更好的异常。最后,您错误地使用了键控状态。
现在谈谈你的实际问题。我假设您使用的是KeyedCoProcessFunction
,因为模型在单独的输入中更新。如果它们是静态的,则可以从静态源(例如,包含在 jar 中(open
加载它们。此外,通常只有一个模型应用于具有不同键的所有值,然后您可以使用BroadCast
状态。因此,我假设您对由键分隔的不同类型的数据有不同的模型。
如果它们来自 input2,那么您已经在调用processElement2
时序列化了它们。
override def processElement2(model: Model, ctx: Context, collector): Unit = {
models.put(ctx.getCurrentKey, model)
modelsBytes.put(ctx.getCurrentKey, model.toBytes(v))
}
然后,您将不会覆盖snapshotState
,因为状态已经是最新的。initializeState
会急切地反序列化模型,或者您也可以在processElement1
中懒惰地实现它们。