我想创建一个执行以下操作的UDF:
a DataFrame
有5列,并且想要创建第六列,其中包含名称的第一个和第二列的值。
让我打印DataFrame
并用它解释:
case class salary(c1: String, c2: String, c3: Int, c4: Int, c5: Int)
val df = Seq(
salary("c3", "c4", 7, 5, 6),
salary("c5", "c4", 8, 10, 20),
salary("c5", "c3", 1, 4, 9))
.toDF()
DataFrame
结果
+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
| c3| c4| 7| 5| 6|
| c5| c4| 8| 10| 20|
| c5| c3| 1| 4| 9|
+---+---+---+---+---+
df.withColumn("c6",UDFName(c1,c2))
,此列的结果应为:
1º行(C3,C4),然后7 5 = 12
2º行(C5,C4),然后20 10 = 30
3º行(C5,C3),然后9 1 = 10
确实不需要UDF。只需使用虚拟MapType
列:
import org.apache.spark.sql.functions.{col, lit, map}
// We use an interleaved list of column name and column value
val values = map(Seq("c3", "c4", "c5").flatMap(c => Seq(lit(c), col(c))): _*)
// Check the first row
df.select(values).limit(1).show(false)
+------------------------------+
|map(c3, c3, c4, c4, c5, c5) |
+------------------------------+
|Map(c3 -> 7, c4 -> 5, c5 -> 6)|
+------------------------------+
并在表达式中使用它:
df.withColumn("c6", values($"c1") + values($"c2"))
+---+---+---+---+---+---+
| c1| c2| c3| c4| c5| c6|
+---+---+---+---+---+---+
| c3| c4| 7| 5| 6| 12|
| c5| c4| 8| 10| 20| 30|
| c5| c3| 1| 4| 9| 10|
+---+---+---+---+---+---+
比处理UDFs
和Rows
:
import org.apache.spark.sql.functions.{struct, udf}
import org.apache.spark.sql.Row
val f = udf((row: Row) => for {
// Use Options to avoid problems with null columns
// Explicit null checks should be faster, but much more verbose
c1 <- Option(row.getAs[String]("c1"))
c2 <- Option(row.getAs[String]("c2"))
// In this case we could (probably) skip Options below
// but Ints in Spark SQL can get null
x <- Option(row.getAs[Int](c1))
y <- Option(row.getAs[Int](c2))
} yield x + y)
df.withColumn("c6", f(struct(df.columns map col: _*)))
+---+---+---+---+---+---+
| c1| c2| c3| c4| c5| c6|
+---+---+---+---+---+---+
| c3| c4| 7| 5| 6| 12|
| c5| c4| 8| 10| 20| 30|
| c5| c3| 1| 4| 9| 10|
+---+---+---+---+---+---+
用户定义的函数(UDF)可以访问直接传递为输入参数的值。
如果要访问其他列,则UDF只能访问它们 iff ,您将其作为输入参数传递。这样,您应该很容易地实现自己追求的目标。
我强烈建议使用struct函数组合所有其他列。
struct(cols:column*):列创建一个新的结构列。
您也可以使用dataset.columns方法将列访问struct
。
列:array [string] 返回所有列名称为阵列。