我想将Hive中所有现有的UDTF转换为Scala函数,并从Spark SQL中使用它



有人能给我一个用scala编写的UDTF(例如:爆炸)的例子吗?它返回多行并将其用作SparkSQL中的UDF?

表:表1

+------+----------+----------+
|userId|someString|      varA|
+------+----------+----------+
|     1|  example1| [0, 2, 5]|
|     2|  example2|[1, 20, 5]|
+------+----------+----------+

我想创建以下Scala代码:

def exampleUDTF(var: Seq[Int]) = <Return Type???>  {
  // code to explode varA field ???
}
sqlContext.udf.register("exampleUDTF",exampleUDTF _)
sqlContext.sql("FROM table1 SELECT userId, someString, exampleUDTF(varA)").collect().foreach(println)

预期输出:

+------+----------+----+
|userId|someString|varA|
+------+----------+----+
|     1|  example1|   0|
|     1|  example1|   2|
|     1|  example1|   5|
|     2|  example2|   1|
|     2|  example2|  20|
|     2|  example2|   5|
+------+----------+----+

使用UDF无法执行此操作。UDF只能向DataFrame添加一列。但是,有一个名为DataFrame.explode的函数,您可以使用它。用你的例子,你可以这样做:

import org.apache.spark.sql._
val df = Seq(
  (1,"example1", Array(0,2,5)),
  (2,"example2", Array(1,20,5))
).toDF("userId", "someString", "varA")
val explodedDf = df.explode($"varA"){
  case Row(arr: Seq[Int]) => arr.toArray.map(a => Tuple1(a))
}.drop($"varA").withColumnRenamed("_1", "varA")
+------+----------+-----+
|userId|someString| varA|
+------+----------+-----+
|     1|  example1|    0|
|     1|  example1|    2|
|     1|  example1|    5|
|     2|  example2|    1|
|     2|  example2|   20|
|     2|  example2|    5|
+------+----------+-----+

注意,explode将函数作为自变量。因此,即使不能创建一个UDF来执行您想要的操作,也可以创建一个函数传递给explode来执行您需要的操作。像这样:

def exploder(row: Row) : Array[Tuple1[Int]] = {
  row match { case Row(arr) => arr.toArray.map(v => Tuple1(v)) }
}
df.explode($"varA")(exploder)

这大约是在重新创建UDTF方面你将得到的最好的结果。

配置单元表:

name                                               id
["Subhajit Sen","Binoy Mondal","Shantanu Dutta"]   15
["Gobinathan SP","Harsh Gupta","Rahul Anand"]      16
  1. 创建scala函数:

    def toUpper(name: Seq[String]) = (name.map(a => a.toUpperCase)).toSeq

  2. 将函数注册为UDF:sqlContext.udf.register("toUpper",toUpper _)

  3. 使用sqlContext调用UDF并将输出存储为DataFrame对象:

var df = sqlContext.sql("SELECT toUpper(name) FROM namelist").toDF("Name")

  1. 分解数据帧:df.explode(df("Name")){case org.apache.spark.sql.Row(arr: Seq[String]) => arr.toSeq.map(v => Tuple1(v))}.drop(df("Name")).withColumnRenamed("_1","Name").show

结果:

+--------------+
|          Name|
+--------------+
|  SUBHAJIT SEN|
|  BINOY MONDAL|
|SHANTANU DUTTA|
| GOBINATHAN SP|
|   HARSH GUPTA|
|   RAHUL ANAND|
+--------------+

相关内容

  • 没有找到相关文章

最新更新