我正在尝试为Spark创建一个Scala UDF,可以在Spark SQL中使用。该函数的目标是接受任何列类型作为输入,并将其放入ArrayType中,除非输入已经是ArrayType。
这是我目前为止的代码:
package com.my_namespace.spark.udf
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
class GetDatatype extends UDF1[Object, scala.collection.Seq[_]] {
override def call(inputObject: Object): scala.collection.Seq[_] = {
if (inputObject.isInstanceOf[scala.collection.Seq[_]]) {
return inputObject.asInstanceOf[scala.collection.Seq[_]]
} else {
return Array(inputObject)
}
}
}
val myFunc = new GetDatatype().call _
val myFuncUDF = udf(myFunc)
spark.udf.register("myFuncUDF", myFuncUDF)
数据可能如下所示:
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|create_date|item |datatype_of_item |item2 |datatype_of_item2 |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|2021-06-01 |[item 3, 3]|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|string 3|java.lang.String |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
或:
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|create_date|item |datatype_of_item |item2 |datatype_of_item_2 |
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|2021-05-01 |[[item 1, 1], [item 2, 2]]|scala.collection.mutable.WrappedArray$ofRef|[string 1, string 2]|scala.collection.mutable.WrappedArray$ofRef|
|2021-06-01 |[[item 3, 3]] |scala.collection.mutable.WrappedArray$ofRef|[string 3] |scala.collection.mutable.WrappedArray$ofRef|
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
UDF函数可以从item或item2列传递内容。
但是当执行这一行时:
val myFuncUDF = udf(myFunc)
我得到以下错误:
scala> val myFuncUDF = udf(myFunc)
java.lang.UnsupportedOperationException: Schema for type Any is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743)
Spark不能使用这种返回类型(Any, order Object)的udf。我认为不使用UDF也可以:
val df = Seq(
(Seq((1,"a"),(2,"b")),(1,"a"))
).toDF("item","item 2")
def wrapInArray(df:DataFrame,c:String) = if(df.schema(c).dataType.isInstanceOf[ArrayType]) col(c) else array(col(c))
df
.withColumn("test",wrapInArray(df,"item"))
.withColumn("test 2",wrapInArray(df,"item 2"))
给出模式
root
|-- item: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: integer (nullable = false)
| | |-- _2: string (nullable = true)
|-- item 2: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: string (nullable = true)
|-- test: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: integer (nullable = false)
| | |-- _2: string (nullable = true)
|-- test 2: array (nullable = false)
| |-- element: struct (containsNull = true)
| | |-- _1: integer (nullable = false)
| | |-- _2: string (nullable = true)