我想在scala中为spark 2.0中的管道编写一个自定义的Transformer
。到目前为止,我还不清楚copy
或transformSchema
方法应该返回什么。它们返回一个null
对吗?https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java获取副本?
当Transformer
扩展PipelineStage
时,我得出结论,fit
调用transformSchema
方法。我是否正确理解transformSchema
类似于ask - learning契合度?
由于我的Transformer
应该与(非常小的)第二个数据集连接数据集,我也想将该数据集存储在序列化管道中。我应该如何将其存储在转换器中以正确地使用管道序列化机制?
计算单个列的平均值并填充nan值并保存该值的简单转换器看起来如何?
@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {
def transform(df: Dataset[MyClass]): DataFrame = {
}
override def copy(extra: ParamMap): Transformer = {
}
override def transformSchema(schema: StructType): StructType = {
schema
}
}
transformSchema
应该返回应用Transformer
后预期的模式。例子:
-
如果变压器添加
IntegerType
列,输出列名称为foo
:import org.apache.spark.sql.types._ override def transformSchema(schema: StructType): StructType = { schema.add(StructField("foo", IntegerType)) }
这在Spark SQL(和MLlib)中是不可能的,因为所以如果数据集的模式没有改变,因为只有一个名称值被填充为平均值输入,我应该返回原始的case类作为模式?
Dataset
是不可变的一旦创建。您只能添加或"替换"(即添加之后的drop
操作)列。
首先,我不确定你想要一个Transformer
本身(或UnaryTransformer
作为@LostInOverflow建议的答案),你说:
计算单个列的平均值并填充nan值并保存该值的简单转换器看起来如何?
对我来说,这就好像你想应用一个聚合函数(又名聚合)并将它与所有列"连接"以产生最终值或NaN。
它看起来像你想要一个groupBy
为mean
做聚合,然后join
也可以是一个窗口聚合。
无论如何,我将从UnaryTransformer
开始,这将解决你的问题中的第一个问题:
到目前为止,我还不清楚
copy
或transformSchema
方法应该返回什么。它们返回null正确吗?
请参阅GitHub上的完整项目spark-mllib-custom-transformer,我在其中实现了UnaryTransformer
到toUpperCase
的字符串列,其中UnaryTransformer看起来如下:
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{DataType, StringType}
class UpperTransformer(override val uid: String)
extends UnaryTransformer[String, String, UpperTransformer] {
def this() = this(Identifiable.randomUID("upp"))
override protected def createTransformFunc: String => String = {
_.toUpperCase
}
override protected def outputDataType: DataType = StringType
}