Spark UDF Overloading



我有一个要求,即Spark UDF必须重载,我知道Spark中不支持UDF重载。因此,为了克服spark的这一限制,我尝试创建一个接受Any类型的UDF,在UDF内部,它会找到实际的数据类型,并调用相应的方法进行计算并相应地返回值。当这样做的时候,我得到了一个错误作为

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Any is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:720)
at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:213)
at com.experian.spark_jobs.Test$.main(Test.scala:9)
at com.experian.spark_jobs.Test.main(Test.scala)

以下是示例代码:

import org.apache.spark.sql.SparkSession
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
spark.udf.register("testudf", testudf _)
spark.sql("create temporary view testView as select testudf(1, 2) as a").show()
spark.sql("select testudf(a, 5) from testView").show()
}
def testudf(a: Any, b: Any) = {
if (a.isInstanceOf[Integer] && b.isInstanceOf[Integer]) {
add(a.asInstanceOf[Integer], b.asInstanceOf[Integer])
} else if (a.isInstanceOf[java.math.BigDecimal] && b.isInstanceOf[java.math.BigDecimal]) {
add(a.asInstanceOf[java.math.BigDecimal], b.asInstanceOf[java.math.BigDecimal])
}
}
def add(decimal: java.math.BigDecimal, decimal1: java.math.BigDecimal): java.math.BigDecimal = {
decimal.add(decimal1)
}
def add(integer: Integer, integer1: Integer): Integer = {
integer + integer1
}
}

有可能实现上述要求吗?如果没有,请给我一个更好的方法。

注意:Spark版本-2.4.0

使用Dataframe(非类型化(的问题是,在编译时执行某种多态性之类的操作非常痛苦。理想情况下,拥有列类型将允许使用特定的";添加函数";实现,就像您使用Monoids一样。但Spark Dataframe API离这个世界很远。使用"数据集"或"无框架"有很大帮助。

在您的示例中,要在运行时检查类型,您将需要AnyRef而不是Any。这应该行得通。

最新更新