使用calludf创建一种链条UDF调用的方法



i monkey修补了org.apache.spark.sql.Column类以添加chainUDF方法。它适用于不采取争论的UDF,我需要帮助,以使其成为参数的UDF的通用。

这是当前的chainUDF方法定义。

object ColumnExt {
  implicit class ColumnMethods(c: Column) {
    def chainUDF(udfName: String): Column = {
      callUDF(udfName, c)
    }
  }
}

这是行动中的chainUDF方法。

def appendZ(s: String): String = {
  s"${s}Z"
}
spark.udf.register("appendZUdf", appendZ _)
def prependA(s: String): String = {
  s"A${s}"
}
spark.udf.register("prependAUdf", prependA _)
val hobbiesDf = Seq(
  ("dance"),
  ("sing")
).toDF("word")
val actualDf = hobbiesDf.withColumn(
  "fun",
  col("word").chainUDF("appendZUdf").chainUDF("prependAUdf")
)

我想更新chainUDF方法定义,以获取Column参数的可选列表。这样的东西:

def appendWord(s: String, word: String): String = {
  s"${s}${word}"
}
spark.udf.register("appendWordUdf", appendWord _)
val hobbiesDf = Seq(
  ("dance"),
  ("sing")
).toDF("word")
val actualDf = hobbiesDf.withColumn(
  "fun",
  col("word").chainUDF("appendZUdf").chainUDF("appendWordUdf", lit("cool"))
)

我认为我们需要将chainUDF方法定义更新为这样的东西:

object ColumnExt {
  implicit class ColumnMethods(c: Column) {
    def chainUDF(udfName: String, cols: Column* = some_default_value): Column = {
      callUDF(udfName, c + cols)
    }
  }
}

我敢肯定有一些Scala魔术可以实现这一目标。

签名是:

def callUDF(udfName: String, cols: Column*): Column

所以您不需要魔术:

def chainUDF(udfName: String, cols: Column* = some_default_value): Column = {
  callUDF(udfName, c +: cols: _*)
}

最新更新