我正在制作一个spark 2.0.0片段,其中我的要求是使用'com.facebook.hive.udf。UDFNumberRows的函数在我的sql上下文中用于查询之一。在我的集群与Hive查询,我使用它作为一个临时函数,只是通过定义:UDFNumberRows',这很简单。
我试着注册这个sparkSession,但得到一个错误:
sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""")
错误:CREATE TEMPORARY FUNCTION rowsequence AS 'com.facebook.hive.udf.UDFNumberRows'
16/11/01 20:46:17 ERROR ApplicationMaster: User class threw exception: java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead.
java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead.
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeFunctionBuilder(SessionCatalog.scala:751)
at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:61)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.delayedEndpoint$com$mediamath$spark$attribution$sparkjob$SparkVideoCidJoin$1(SparkVideoCidJoin.scala:75)
at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$delayedInit$body.apply(SparkVideoCidJoin.scala:22)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.main(SparkVideoCidJoin.scala:22)
at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin.main(SparkVideoCidJoin.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
是否有人知道如何注册它作为spark正在询问,即在sparkSession和SQLContext中注册api:
sqlContext.udf.register(...)
在Spark 2.0中
sparkSession.udf.register(...)
允许您注册Java或Scala udf (Long类型的函数=> Long),但不允许Hive genericudf处理LongWritable而不是Long,并且可以有可变数量的参数。
要注册Hive udf,您的第一种方法是正确的:
sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""")
但是你必须先启用Hive支持:
SparkSession.builder().enableHiveSupport()
并确保"spark-hive"依赖项存在于你的类路径中。
解释:
您的错误信息
java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead
来自类SessionCatalog。
通过调用SparkSession.builder(). enablehivessupport ()将用HiveSessionCatalog替换SessionCatalog,在HiveSessionCatalog中实现makeFunctionBuilder方法。
最后:
您想要使用的UDF, 'com.facebook.hive.udf。UDFNumberRows',是在Hive中不可用窗口函数的时候编写的。我建议你用它们代替。你可以查看Hive Reference,这是Spark-SQL的介绍,或者如果你想坚持scala语法,这是
你面临的问题是Spark没有在他的classPath中加载jar库。
在我们的团队中,我们使用——jar选项加载外部库。
/usr/bin/spark-submit --jars external_library.jar our_program.py --our_params
可以在Spark History - Environment选项卡中检查是否正在加载外部库。( spark.yarn.secondary.jars
然后您将能够注册您的udf,正如您所说的。一旦你启用hivessupport,如FurryMachine所说。
sparkSession.sql("""
CREATE TEMPORARY FUNCTION myFunc AS
'com.facebook.hive.udf.UDFNumberRows'
""")
你可以在spark-summit——help
找到更多信息hadoop:~/projects/neocortex/src$ spark-submit --help
Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]
Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of local jars to include on the driver
and executor classpaths.
您可以像在sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)
中那样直接使用SparkSession
注册UDF。查看这里的详细文档