我正在使用UDF将一些ML模型应用于数据流。由于Model
类(来自第三方库(无法由 Flink 自动序列化,因此我使用了两个变量,如下所示:
class MyUDF extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
with CheckpointedFunction {
// To hold loaded models
@transient private var models: HashMap[(String, String), Model] = _
// For serialization purposes
@transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _
...
}
哪里:
models
保存加载(运行(的模型(从ModelDef
创建,基本上是一个字符串(modelsBytes
是真正的(键控(状态,它包含相同的模型,但作为字节块,以便检查点正常工作。
整体解决方案很简单(只需要在恢复/保存模型时在我的模型上调用fromBytes
/toBytes
(,但我不知道这是否是一种常见/最佳实践。对于本质上相同的事物,有两个变量看起来像是一个怪癖。例如,在这里你可以找到一个使用TypeSerializer[Option[Model]]
的例子,它看起来更干净,但实现起来也更复杂。
所以,基本上:
我应该使用
TypeSerializer
方法还是可以对运行/序列化模型具有某种重复状态?另外,如果您能指出一些有关 Flink 中自定义类型序列化的文档/示例,那就太好了,通常我发现官方文档在这方面有点缺乏。
我可能会使用堆状态后端和自定义TypeSerializer
。
堆状态后端将仅在检查点上序列化数据,否则保持数据原样。因此,使用该后端而不是管理地图本身时,您几乎没有性能损失。但是,它将消除手动执行序列化和同步的需要。
为了性能而明智地对数据进行非规范化是一种非常常见的模式。 如果您没有使用太多内存,请坚持使用此方法。