扩展DefaultParamsReadable和DefaultParamsWritable不允许读取自定义模型



你好,

几天来,我一直在努力保存一个自定义变压器,它是一个大型舞台管道的一部分。我有一个完全由其参数定义的转换器。我有一个估计器,它的拟合方法将生成一个矩阵,然后相应地设置变换器参数,这样我就可以使用DefaultParamsReadable和DefaultParamsRead来利用util中已经存在的串行化/去串行化。ReadWrite.scala.

我总结的代码如下(包括重要方面):

...
import org.apache.spark.ml.util._
...
// trait to implement in Estimator and Transformer for params
trait NBParams extends Params {

final val featuresCol= new Param[String](this, "featuresCol", "The input column")
setDefault(featuresCol, "_tfIdfOut")
final val labelCol = new Param[String](this, "labelCol", "The labels column")
setDefault(labelCol, "P_Root_Code_Index")

final val predictionsCol = new Param[String](this, "predictionsCol", "The output column")
setDefault(predictionsCol, "NBOutput")

final val ratioMatrix = new Param[DenseMatrix](this, "ratioMatrix", "The transformation matrix")

def getfeaturesCol: String = $(featuresCol)  
def getlabelCol: String = $(labelCol)
def getPredictionCol: String = $(predictionsCol)  
def getRatioMatrix: DenseMatrix = $(ratioMatrix) 

}

// Estimator
class CustomNaiveBayes(override val uid: String, val alpha: Double) 
extends Estimator[CustomNaiveBayesModel] with NBParams with DefaultParamsWritable {
def copy(extra: ParamMap): CustomNaiveBayes = {
defaultCopy(extra)
}
def setFeaturesCol(value: String): this.type = set(featuresCol, value) 
def setLabelCol(value: String): this.type = set(labelCol, value) 
def setPredictionCol(value: String): this.type = set(predictionsCol, value) 

def setRatioMatrix(value: DenseMatrix): this.type = set(ratioMatrix, value) 

override def transformSchema(schema: StructType): StructType = {...}

override def fit(ds: Dataset[_]): CustomNaiveBayesModel = {
...
val model = new CustomNaiveBayesModel(uid)
model
.setRatioMatrix(ratioMatrix)
.setFeaturesCol($(featuresCol))
.setLabelCol($(labelCol))
.setPredictionCol($(predictionsCol))
}
}
// companion object for Estimator
object CustomNaiveBayes extends DefaultParamsReadable[CustomNaiveBayes]{
override def load(path: String): CustomNaiveBayes = super.load(path)
}
// Transformer
class CustomNaiveBayesModel(override val uid: String) 
extends Model[CustomNaiveBayesModel] with NBParams with DefaultParamsWritable {  

def this() = this(Identifiable.randomUID("customnaivebayes"))

def copy(extra: ParamMap): CustomNaiveBayesModel = {defaultCopy(extra)}

def setFeaturesCol(value: String): this.type = set(featuresCol, value) 

def setLabelCol(value: String): this.type = set(labelCol, value) 

def setPredictionCol(value: String): this.type = set(predictionsCol, value) 

def setRatioMatrix(value: DenseMatrix): this.type = set(ratioMatrix, value) 
override def transformSchema(schema: StructType): StructType = {...}
}
def transform(dataset: Dataset[_]): DataFrame = {...}
}

// companion object for Transformer
object CustomNaiveBayesModel extends DefaultParamsReadable[CustomNaiveBayesModel] 

当我将此模型添加为管道的一部分并适应管道时,所有操作都正常。当我保存管道时,没有错误。然而,当我尝试在中加载管道时,我会得到以下错误:

NoSuchMethodException:$line3b380bcad77e4e84ae25a6bb1f3ec0d45.$read$$iw$$iw$$$iw$$iw$CustomNaiveBayesModel.read()

为了保存管道,其中包括许多与NLP预处理相关的其他转换器,我运行

fittedModelRootCode.write.save("path")

然后加载它(在发生故障的地方),我运行

import org.apache.spark.ml.PipelineModel
val fittedModelRootCode = PipelineModel.load("path")

模型本身似乎运行良好,但每次我想使用它时,我都无法在数据集上重新训练模型。有人知道为什么即使有伴随对象,read()方法似乎也不可用吗?

注:

  • 我在Databricks Runtime 8.3(Spark 3.1.1,Scala 2.12)上运行
  • 我的模型在一个单独的包中,因此是Spark的外部
  • 我已经根据一些现有的例子复制了这一点,所有这些例子看起来都很好用,所以我不确定为什么我的代码失败了
  • 我知道Spark ML中有一个Naive Bayes模型,但是,我的任务是进行大量的自定义,所以不值得修改现有版本(另外,我想学习如何正确地做到这一点)

如有任何帮助,我们将不胜感激。

由于您通过DefaultParamsReadable扩展了CustomNaiveBayesModel伴随对象,我认为您应该使用伴随对象CustomNaiveBayesModel来加载模型。在这里,我写了一些保存和加载模型的代码,它可以正常工作:

import org.apache.spark.SparkConf
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.SparkSession
import path.to.CustomNaiveBayesModel

object SavingModelApp extends App {
val spark: SparkSession = SparkSession.builder().config(
new SparkConf()
.setMaster("local[*]")
.setAppName("Test app")
.set("spark.driver.host", "localhost")
.set("spark.ui.enabled", "false")
).getOrCreate()
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
val fittedModelRootCode: PipelineModel = new Pipeline().setStages(Array(new CustomNaiveBayesModel())).fit(training)
fittedModelRootCode.write.save("path/to/model")
val mod = PipelineModel.load("path/to/model")
}

我认为你的错误是用PipelineModel.load加载混凝土模型。

我的环境:

scalaVersion := "2.12.6"
scalacOptions := Seq(
"-encoding", "UTF-8", "-target:jvm-1.8", "-deprecation",
"-feature", "-unchecked", "-language:implicitConversions", "-language:postfixOps")
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.1",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.1"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.1.1"

最新更新