在 SparkStreaming 应用程序中注册 UDF



我有一个Spark流应用程序,它使用用Scala编写的SparkSQL,在获得RDD后尝试注册udf。我收到下面的错误。是否无法在 SparkStreaming 应用程序中注册 udfs?

下面是引发错误的代码片段:

sessionStream.foreachRDD((rdd: RDD[(String)], time: Time) => {
      val sqlcc = SqlContextSingleton.getInstance(rdd.sparkContext)
      sqlcc.udf.register("getUUID", () => java.util.UUID.randomUUID().toString)
...
}

这是我尝试注册函数时引发的错误:

Exception in thread "pool-6-thread-6" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
    at com.ignitionone.datapipeline.ClusterApp$$anonfun$CreateCheckpointStreamContext$1.apply(ClusterApp.scala:173)
    at com.ignitionone.datapipeline.ClusterApp$$anonfun$CreateCheckpointStreamContext$1.apply(ClusterApp.scala:164)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
sessionStream.foreachRDD((rdd: RDD[Event], time: Time) => {
   val f = (t: Long) => t - t % 60000      
   val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
   import sqlContext.implicits._
   val df = rdd.toDF()
   val per_min = udf(f)
   val grouped = df.groupBy(per_min(df("created_at")) as "created_at",
                            df("blah"),
                            df("status")
                           ).agg(sum("price") as "price",sum("payout") as "payout", sum("counter") as "counter")
   ...
}

我工作正常

最新更新