向Spark注册Hive自定义UDF (Spark SQL) 2.0.0



我正在制作一个spark 2.0.0片段,其中我的要求是使用'com.facebook.hive.udf。UDFNumberRows的函数在我的sql上下文中用于查询之一。在我的集群与Hive查询,我使用它作为一个临时函数,只是通过定义:UDFNumberRows',这很简单。

我试着注册这个sparkSession,但得到一个错误:

sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""")
错误:

CREATE TEMPORARY FUNCTION rowsequence AS 'com.facebook.hive.udf.UDFNumberRows'
16/11/01 20:46:17 ERROR ApplicationMaster: User class threw exception: java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead.
java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead.
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeFunctionBuilder(SessionCatalog.scala:751)
    at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:61)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.delayedEndpoint$com$mediamath$spark$attribution$sparkjob$SparkVideoCidJoin$1(SparkVideoCidJoin.scala:75)
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$delayedInit$body.apply(SparkVideoCidJoin.scala:22)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.main(SparkVideoCidJoin.scala:22)
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin.main(SparkVideoCidJoin.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)

是否有人知道如何注册它作为spark正在询问,即在sparkSession和SQLContext中注册api:

 sqlContext.udf.register(...)

在Spark 2.0中

sparkSession.udf.register(...) 

允许您注册Java或Scala udf (Long类型的函数=> Long),但不允许Hive genericudf处理LongWritable而不是Long,并且可以有可变数量的参数。

要注册Hive udf,您的第一种方法是正确的:

sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""")

但是你必须先启用Hive支持:

SparkSession.builder().enableHiveSupport()

并确保"spark-hive"依赖项存在于你的类路径中。

解释:

您的错误信息

java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead

来自类SessionCatalog。

通过调用SparkSession.builder(). enablehivessupport ()将用HiveSessionCatalog替换SessionCatalog,在HiveSessionCatalog中实现makeFunctionBuilder方法。

最后:

您想要使用的UDF, 'com.facebook.hive.udf。UDFNumberRows',是在Hive中不可用窗口函数的时候编写的。我建议你用它们代替。你可以查看Hive Reference,这是Spark-SQL的介绍,或者如果你想坚持scala语法,这是

你面临的问题是Spark没有在他的classPath中加载jar库。

在我们的团队中,我们使用——jar选项加载外部库。

/usr/bin/spark-submit  --jars external_library.jar our_program.py --our_params 

可以在Spark History - Environment选项卡中检查是否正在加载外部库。( spark.yarn.secondary.jars

然后您将能够注册您的udf,正如您所说的。一旦你启用hivessupport,如FurryMachine所说。

sparkSession.sql("""
    CREATE TEMPORARY FUNCTION myFunc AS  
    'com.facebook.hive.udf.UDFNumberRows'
""")

你可以在spark-summit——help

找到更多信息
hadoop:~/projects/neocortex/src$ spark-submit --help
Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]   
Usage: spark-submit run-example [options] example-class [example args]
Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of local jars to include on the driver
                              and executor classpaths.

您可以像在sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)中那样直接使用SparkSession注册UDF。查看这里的详细文档

最新更新