错误执行人:阶段6.0 Spark Scala中任务0.0中的异常



我有一个JSON文件。

{"name":"method2","name1":"test","parameter1":"C:/Users/test/Desktop/Online.csv","parameter2": 1.0}

我正在加载我的JSON文件。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/data.json")
val df1=df.select($"name",$"parameter1",$"parameter2").toDF()
df1.show()

我具有下面的3个功能:

def method1(P1:String, P2:Double) {
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").dropDuplicates("CID", "Sc").withColumn("Rat", lit(P2))
val outPutPath="C:/Users/test/Desktop/output"
rs.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def method2(P1:String, P2:Double){
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").withColumn("r", lit(P2))
val rs1= rs.filter($"CID" =!= "").groupBy("CID","Sc").agg(sum(rs("r")).alias("R"))
val outPutPath="C:/Users/test/Desktop/output"
rs1.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def methodn(P1:String, P2:Double) {
println("method 2 printhing")
println(P2)
}

我试图使用以下代码

调用上述功能
df1.map( row => (row.getString(0), row.getString(1), row.getDouble(2) ) ).foreach { x =>
      x._1.trim.toLowerCase match {
          case "method1" => method1(x._2, x._3) 
          case "method2" => method2(x._2, x._3)
          case _ => methodn(x._2, x._3)
      }
   } 

基于我的JSON对象,它应该调用Method2,但是当我尝试执行上述代码时,我要低于错误。

17/11/22 16:15:44 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
java.lang.NullPointerException
        at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.method2(<console>:24)
        at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:40)
        at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
17/11/22 16:15:44 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 6, localhost, executor driver): java.lang.NullPointerException
        at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.method2(<console>:24)
        at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:40)
        at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
17/11/22 16:15:44 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost, executor driver): java.lang.NullPointerException
        at method2(<console>:24)
        at $anonfun$2.apply(<console>:40)
        at $anonfun$2.apply(<console>:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
  at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
  at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325)
  at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
  at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
  at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
  ... 54 elided
Caused by: java.lang.NullPointerException
  at method2(<console>:24)
  at $anonfun$2.apply(<console>:40)
  at $anonfun$2.apply(<console>:37)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
  at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

请帮助我如何解决此问题。

您正在获取NullPointerException,因为您正在尝试在 functions method1, method2)内访问 sparkSession spark )。那不是实际问题。主要问题是您从dataframe map函数中调用这些功能。那是主要问题。

您无法访问transformations 内定义的transformations外部定义的变量。所有功能都在transformations内部调用,并且Spark 在这些功能中使用的spark变量找不到任何定义。那是获得nullPointerException的主要原因。

解决方案是调用可以访问spark变量的函数,而不是从transformation中访问。因此,将您的最后一个transformation更改为action将执行技巧

val process = df1.map( row => (row.getString(0), row.getString(1), row.getDouble(2) ) ).collect
process.foreach { x =>
  x._1.trim.toLowerCase match {
    case "method1" => method1(x._2, x._3)
    case "method2" => method2(x._2, x._3)
    case _ => methodn(x._2, x._3)
  }
}

我希望答案有帮助

相关内容

  • 没有找到相关文章

最新更新