有人能给我一个用scala编写的UDTF(例如:爆炸)的例子吗?它返回多行并将其用作SparkSQL中的UDF?
表:表1
+------+----------+----------+
|userId|someString| varA|
+------+----------+----------+
| 1| example1| [0, 2, 5]|
| 2| example2|[1, 20, 5]|
+------+----------+----------+
我想创建以下Scala代码:
def exampleUDTF(var: Seq[Int]) = <Return Type???> {
// code to explode varA field ???
}
sqlContext.udf.register("exampleUDTF",exampleUDTF _)
sqlContext.sql("FROM table1 SELECT userId, someString, exampleUDTF(varA)").collect().foreach(println)
预期输出:
+------+----------+----+
|userId|someString|varA|
+------+----------+----+
| 1| example1| 0|
| 1| example1| 2|
| 1| example1| 5|
| 2| example2| 1|
| 2| example2| 20|
| 2| example2| 5|
+------+----------+----+
使用UDF
无法执行此操作。UDF
只能向DataFrame
添加一列。但是,有一个名为DataFrame.explode
的函数,您可以使用它。用你的例子,你可以这样做:
import org.apache.spark.sql._
val df = Seq(
(1,"example1", Array(0,2,5)),
(2,"example2", Array(1,20,5))
).toDF("userId", "someString", "varA")
val explodedDf = df.explode($"varA"){
case Row(arr: Seq[Int]) => arr.toArray.map(a => Tuple1(a))
}.drop($"varA").withColumnRenamed("_1", "varA")
+------+----------+-----+
|userId|someString| varA|
+------+----------+-----+
| 1| example1| 0|
| 1| example1| 2|
| 1| example1| 5|
| 2| example2| 1|
| 2| example2| 20|
| 2| example2| 5|
+------+----------+-----+
注意,explode
将函数作为自变量。因此,即使不能创建一个UDF
来执行您想要的操作,也可以创建一个函数传递给explode
来执行您需要的操作。像这样:
def exploder(row: Row) : Array[Tuple1[Int]] = {
row match { case Row(arr) => arr.toArray.map(v => Tuple1(v)) }
}
df.explode($"varA")(exploder)
这大约是在重新创建UDTF
方面你将得到的最好的结果。
配置单元表:
name id
["Subhajit Sen","Binoy Mondal","Shantanu Dutta"] 15
["Gobinathan SP","Harsh Gupta","Rahul Anand"] 16
创建scala函数:
def toUpper(name: Seq[String]) = (name.map(a => a.toUpperCase)).toSeq
将函数注册为UDF:
sqlContext.udf.register("toUpper",toUpper _)
使用sqlContext调用UDF并将输出存储为DataFrame对象:
var df = sqlContext.sql("SELECT toUpper(name) FROM namelist").toDF("Name")
- 分解数据帧:
df.explode(df("Name")){case org.apache.spark.sql.Row(arr: Seq[String]) => arr.toSeq.map(v => Tuple1(v))}.drop(df("Name")).withColumnRenamed("_1","Name").show
结果:
+--------------+
| Name|
+--------------+
| SUBHAJIT SEN|
| BINOY MONDAL|
|SHANTANU DUTTA|
| GOBINATHAN SP|
| HARSH GUPTA|
| RAHUL ANAND|
+--------------+