为 ML 模型服务的 KeyedCoProcessFunction 上的状态处理



我正在研究一个KeyedCoProcessFunction,看起来像这样:

class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
with CheckpointedFunction {
// To hold loaded models
@transient private var models: HashMap[(String, String), Model] = _
// For serialization purposes
@transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _
...
override def snapshotState(context: FunctionSnapshotContext): Unit = {
modelsBytes.clear() // This raises an exception when there is no active key set
for ((k, model) <- models) {
modelsBytes.put(k, model.toBytes(v))
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
modelsBytes = context.getKeyedStateStore.getMapState[String, String](
new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
)
if (context.isRestored) {
// restore models from modelsBytes
}
}
}

该状态由使用第三方库构建的 ML 模型集合组成。在检查点之前,我需要将加载的模型转储到snapshotState中的字节数组中。

我的问题是,在snapshotState中,当没有活动键时,modelsBytes.clear()引发异常。当我从头开始启动应用程序而输入流上没有任何数据时,就会发生这种情况。因此,当检查点的时间到来时,我收到此错误:

java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.

但是,当输入流包含数据时,检查点可以正常工作。我对此有点困惑,因为snapshotState没有提供键控上下文(与processElement1processElement2相反,其中当前键可以通过执行ctx.getCurrentKey来访问(,所以在我看来,对clearput的调用应该始终失败snapshotState因为它们应该只在键控上下文中工作。谁能澄清这是否是预期的行为?

密钥状态只能用于文档中编写的密钥流。

* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.

如果调用clear(),则不会清除整个映射,而只是重置当前密钥的状态。密钥在processElementX中始终是已知的。

/**
* Removes the value mapped under the current key.
*/
void clear();

当您尝试在processElementX以外的函数中调用clear时,您实际上应该会收到更好的异常。最后,您错误地使用了键控状态。

现在谈谈你的实际问题。我假设您使用的是KeyedCoProcessFunction,因为模型在单独的输入中更新。如果它们是静态的,则可以从静态源(例如,包含在 jar 中(open加载它们。此外,通常只有一个模型应用于具有不同键的所有值,然后您可以使用BroadCast状态。因此,我假设您对由键分隔的不同类型的数据有不同的模型。

如果它们来自 input2,那么您已经在调用processElement2时序列化了它们。

override def processElement2(model: Model, ctx: Context, collector): Unit = {
models.put(ctx.getCurrentKey, model)
modelsBytes.put(ctx.getCurrentKey, model.toBytes(v))
}

然后,您将不会覆盖snapshotState,因为状态已经是最新的。initializeState会急切地反序列化模型,或者您也可以在processElement1中懒惰地实现它们。

相关内容

  • 没有找到相关文章

最新更新