Spark将Scala对象的所有方法注册为UDF


  • 例如,我有一个对象a和B
object ObjectA{
def funcA1(a:String):String = "#" + a + "#"
def funcA2(a:String, b: Int): String = a * b
}
object ObjectB{
def funcB1(a:String):String = "&" + a + "&"
def funcB2(a:String, b: Int): String = a.sum + b
}
  • 我想在其他地方定义一个方法,函数如下:
def registeredAllMethod(className:String):Unit = {
// How to implement ?
// How to implement ?
}
  • 我希望registerallmethod的函数传入一个类名,然后将这个类中的所有方法注册到Spark的UDF中。用法如下:
// If I use: 
registeredAllMethod("ObjectA")
// I can in sparkSQL such use:
sparkSession.sql("SELECT funcA1('test'),funcA2('test', 5)").show

// If I use: 
registeredAllMethod("ObjectB")
// I can in sparkSQL such use:
sparkSession.sql("SELECT funcB1('test'),funcB2('test', 5)").show

谢谢你能耐心地看到这里如果你能解决这个问题,我将不胜感激!

您可以尝试使registeredAllMethod成为宏

import scala.language.experimental.macros
import scala.reflect.macros.blackbox
object Macros {
def registeredAllMethod(className:String): Unit = macro registeredAllMethodImpl
def registeredAllMethodImpl(c: blackbox.Context)(className:c.Tree): c.Tree = {
import c.universe._
val classNameStr = c.eval(c.Expr[String](className))
val moduleSymbol = c.mirror.staticModule(classNameStr)
val calls = moduleSymbol.typeSignature.decls.toList
.filter(decl => decl.isMethod && !decl.isConstructor)
.map(methodSymbol =>
q"sparkSession.udf.register(${methodSymbol.name.toString}, $methodSymbol _)"
)
q"..$calls"
}
}

https://gist.github.com/DmytroMitin/0f8d044d839756dd68ee901703e68ee6

其他选项似乎不起作用:

  • Scala工具箱生成java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

https://gist.github.com/DmytroMitin/615e7420b7de5d209c0631f269129f9a

  • Real Scala编译器的行为类似

https://gist.github.com/DmytroMitin/28936be58ba943d7771d7d4ede58abff

  • Java反射(使用LambdaMetafactory(生成org.apache.spark.SparkException: Task not serializableCaused by: java.io.NotSerializableException: App$$$Lambda$994/768702707

https://gist.github.com/DmytroMitin/387e75ed39148fc8e70839584392d946

  • Scala反射(带工具箱(也会产生上述两个异常之一,这取决于我们是向.register提供lambda还是匿名类的实例

https://gist.github.com/DmytroMitin/2a292d35f3c3ac5cf96d22dd81721366

Spark反射中有东西断裂。因此,宏似乎是最好的选择。


实际上我设法修复了";Java反射";方法,但不那么容易

https://gist.github.com/DmytroMitin/68909e971141f442f75fa09c46f69b16

诀窍是创建new FunctionN with Serializable {...}。但我并没有用运行时编译(例如,用反射工具箱;无论我做什么,我都会收到一个lambda,而不是一个类的实例(,只是用字节码操作(用Javassist(。

宏似乎更容易。


此外,您可以在对象中使defs为vals,然后序列化问题应该会消失

https://gist.github.com/DmytroMitin/4000bfc43cb1343578c4dc5d18acf6dc

最新更新