在 Flink 中序列化复杂模型的最佳实践



我正在使用UDF将一些ML模型应用于数据流。由于Model类(来自第三方库(无法由 Flink 自动序列化,因此我使用了两个变量,如下所示:

class MyUDF extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
with CheckpointedFunction {
// To hold loaded models
@transient private var models: HashMap[(String, String), Model] = _
// For serialization purposes
@transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _
...
}

哪里:

  • models保存加载(运行(的模型(从ModelDef创建,基本上是一个字符串(
  • modelsBytes是真正的(键控(状态,它包含相同的模型,但作为字节块,以便检查点正常工作。

整体解决方案很简单(只需要在恢复/保存模型时在我的模型上调用fromBytes/toBytes(,但我不知道这是否是一种常见/最佳实践。对于本质上相同的事物,有两个变量看起来像是一个怪癖。例如,在这里你可以找到一个使用TypeSerializer[Option[Model]]的例子,它看起来更干净,但实现起来也更复杂。

所以,基本上:

  • 我应该使用TypeSerializer方法还是可以对运行/序列化模型具有某种重复状态?

  • 另外,如果您能指出一些有关 Flink 中自定义类型序列化的文档/示例,那就太好了,通常我发现官方文档在这方面有点缺乏。

我可能会使用堆状态后端和自定义TypeSerializer

堆状态后端将仅在检查点上序列化数据,否则保持数据原样。因此,使用该后端而不是管理地图本身时,您几乎没有性能损失。但是,它将消除手动执行序列化和同步的需要。

为了性能而明智地对数据进行非规范化是一种非常常见的模式。 如果您没有使用太多内存,请坚持使用此方法。

相关内容

  • 没有找到相关文章

最新更新