如何在MLlib中编写自定义Transformer



我想在scala中为spark 2.0中的管道编写一个自定义的Transformer。到目前为止,我还不清楚copytransformSchema方法应该返回什么。它们返回一个null对吗?https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java获取副本?

Transformer扩展PipelineStage时,我得出结论,fit调用transformSchema方法。我是否正确理解transformSchema类似于ask - learning契合度?

由于我的Transformer应该与(非常小的)第二个数据集连接数据集,我也想将该数据集存储在序列化管道中。我应该如何将其存储在转换器中以正确地使用管道序列化机制?

计算单个列的平均值并填充nan值并保存该值的简单转换器看起来如何?

@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
    class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {
      def transform(df: Dataset[MyClass]): DataFrame = {
      }
      override def copy(extra: ParamMap): Transformer = {
      }
      override def transformSchema(schema: StructType): StructType = {
        schema
      }
    }

transformSchema应该返回应用Transformer后预期的模式。例子:

  • 如果变压器添加IntegerType列,输出列名称为foo:

    import org.apache.spark.sql.types._
    override def transformSchema(schema: StructType): StructType = {
       schema.add(StructField("foo", IntegerType))
    }
    

所以如果数据集的模式没有改变,因为只有一个名称值被填充为平均值输入,我应该返回原始的case类作为模式?

这在Spark SQL(和MLlib)中是不可能的,因为Dataset不可变的一旦创建。您只能添加或"替换"(即添加之后的drop操作)列。

首先,我不确定你想要一个Transformer本身(或UnaryTransformer作为@LostInOverflow建议的答案),你说:

计算单个列的平均值并填充nan值并保存该值的简单转换器看起来如何?

对我来说,这就好像你想应用一个聚合函数(又名聚合)并将它与所有列"连接"以产生最终值或NaN。

看起来像你想要一个groupBymean做聚合,然后join也可以是一个窗口聚合。

无论如何,我将从UnaryTransformer开始,这将解决你的问题中的第一个问题:

到目前为止,我还不清楚copytransformSchema方法应该返回什么。它们返回null正确吗?

请参阅GitHub上的完整项目spark-mllib-custom-transformer,我在其中实现了UnaryTransformertoUpperCase的字符串列,其中UnaryTransformer看起来如下:

import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{DataType, StringType}
class UpperTransformer(override val uid: String)
  extends UnaryTransformer[String, String, UpperTransformer] {
  def this() = this(Identifiable.randomUID("upp"))
  override protected def createTransformFunc: String => String = {
    _.toUpperCase
  }
  override protected def outputDataType: DataType = StringType
}

相关内容

  • 没有找到相关文章