如何将UDF用值作为对其他列的引用



我想创建一个执行以下操作的UDF:

a DataFrame有5列,并且想要创建第六列,其中包含名称的第一个和第二列的值。

让我打印DataFrame并用它解释:

case class salary(c1: String, c2: String, c3: Int, c4: Int, c5: Int)
val df = Seq(
    salary("c3", "c4", 7, 5, 6),
    salary("c5", "c4", 8, 10, 20),
    salary("c5", "c3", 1, 4, 9))
    .toDF()

DataFrame结果

+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
| c3| c4|  7|  5|  6|
| c5| c4|  8| 10| 20|
| c5| c3|  1|  4|  9|
+---+---+---+---+---+
df.withColumn("c6",UDFName(c1,c2))

,此列的结果应为:

1º行(C3,C4),然后7 5 = 12

2º行(C5,C4),然后20 10 = 30

3º行(C5,C3),然后9 1 = 10

确实不需要UDF。只需使用虚拟MapType列:

import org.apache.spark.sql.functions.{col, lit, map}
// We use an interleaved list of column name and column value
val values = map(Seq("c3", "c4", "c5").flatMap(c => Seq(lit(c), col(c))): _*)
// Check the first row
df.select(values).limit(1).show(false)
+------------------------------+
|map(c3, c3, c4, c4, c5, c5)   |
+------------------------------+
|Map(c3 -> 7, c4 -> 5, c5 -> 6)|
+------------------------------+

并在表达式中使用它:

df.withColumn("c6", values($"c1") + values($"c2"))
+---+---+---+---+---+---+
| c1| c2| c3| c4| c5| c6|
+---+---+---+---+---+---+
| c3| c4|  7|  5|  6| 12|
| c5| c4|  8| 10| 20| 30|
| c5| c3|  1|  4|  9| 10|
+---+---+---+---+---+---+ 

比处理UDFsRows

要清洁,更快,更安全
import org.apache.spark.sql.functions.{struct, udf}
import org.apache.spark.sql.Row
val f = udf((row: Row) => for {
  // Use Options to avoid problems with null columns
  // Explicit null checks should be faster, but much more verbose
  c1 <- Option(row.getAs[String]("c1"))
  c2 <- Option(row.getAs[String]("c2"))
  // In this case we could (probably) skip Options below
  // but Ints in Spark SQL can get null
  x <- Option(row.getAs[Int](c1))
  y <- Option(row.getAs[Int](c2))
} yield x + y)
df.withColumn("c6", f(struct(df.columns map col: _*)))
+---+---+---+---+---+---+
| c1| c2| c3| c4| c5| c6|
+---+---+---+---+---+---+
| c3| c4|  7|  5|  6| 12|
| c5| c4|  8| 10| 20| 30|
| c5| c3|  1|  4|  9| 10|
+---+---+---+---+---+---+ 

用户定义的函数(UDF)可以访问直接传递为输入参数的值。

如果要访问其他列,则UDF只能访问它们 iff ,您将其作为输入参数传递。这样,您应该很容易地实现自己追求的目标。

我强烈建议使用struct函数组合所有其他列。

struct(cols:column*):列创建一个新的结构列。

您也可以使用dataset.columns方法将列访问struct

列:array [string] 返回所有列名称为阵列。

相关内容

  • 没有找到相关文章

最新更新