Scala spark UDF函数,它接受输入并将其放入数组中



我正在尝试为Spark创建一个Scala UDF,可以在Spark SQL中使用。该函数的目标是接受任何列类型作为输入,并将其放入ArrayType中,除非输入已经是ArrayType。

这是我目前为止的代码:

package com.my_namespace.spark.udf
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession

class GetDatatype extends UDF1[Object, scala.collection.Seq[_]] {
override def call(inputObject: Object): scala.collection.Seq[_] = {
if (inputObject.isInstanceOf[scala.collection.Seq[_]]) {
return inputObject.asInstanceOf[scala.collection.Seq[_]]
} else {
return Array(inputObject)
}
}
}
val myFunc = new GetDatatype().call _
val myFuncUDF = udf(myFunc)
spark.udf.register("myFuncUDF", myFuncUDF)

数据可能如下所示:

+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|create_date|item       |datatype_of_item                                              |item2   |datatype_of_item2              |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|2021-06-01 |[item 3, 3]|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|string 3|java.lang.String               |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+

或:

+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|create_date|item                      |datatype_of_item                           |item2               |datatype_of_item_2                         |
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|2021-05-01 |[[item 1, 1], [item 2, 2]]|scala.collection.mutable.WrappedArray$ofRef|[string 1, string 2]|scala.collection.mutable.WrappedArray$ofRef|
|2021-06-01 |[[item 3, 3]]             |scala.collection.mutable.WrappedArray$ofRef|[string 3]          |scala.collection.mutable.WrappedArray$ofRef|
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+

UDF函数可以从item或item2列传递内容。

但是当执行这一行时:

val myFuncUDF = udf(myFunc)

我得到以下错误:

scala> val myFuncUDF = udf(myFunc)
java.lang.UnsupportedOperationException: Schema for type Any is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743)

Spark不能使用这种返回类型(Any, order Object)的udf。我认为不使用UDF也可以:

val df = Seq(
(Seq((1,"a"),(2,"b")),(1,"a"))
).toDF("item","item 2")

def wrapInArray(df:DataFrame,c:String) = if(df.schema(c).dataType.isInstanceOf[ArrayType]) col(c) else array(col(c))
df
.withColumn("test",wrapInArray(df,"item"))
.withColumn("test 2",wrapInArray(df,"item 2"))

给出模式

root
|-- item: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- _1: integer (nullable = false)
|    |    |-- _2: string (nullable = true)
|-- item 2: struct (nullable = true)
|    |-- _1: integer (nullable = false)
|    |-- _2: string (nullable = true)
|-- test: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- _1: integer (nullable = false)
|    |    |-- _2: string (nullable = true)
|-- test 2: array (nullable = false)
|    |-- element: struct (containsNull = true)
|    |    |-- _1: integer (nullable = false)
|    |    |-- _2: string (nullable = true)

最新更新