我有一个JSON文件。
{"name":"method2","name1":"test","parameter1":"C:/Users/test/Desktop/Online.csv","parameter2": 1.0}
我正在加载我的JSON文件。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/data.json")
val df1=df.select($"name",$"parameter1",$"parameter2").toDF()
df1.show()
我具有下面的3个功能:
def method1(P1:String, P2:Double) {
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").dropDuplicates("CID", "Sc").withColumn("Rat", lit(P2))
val outPutPath="C:/Users/test/Desktop/output"
rs.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def method2(P1:String, P2:Double){
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").withColumn("r", lit(P2))
val rs1= rs.filter($"CID" =!= "").groupBy("CID","Sc").agg(sum(rs("r")).alias("R"))
val outPutPath="C:/Users/test/Desktop/output"
rs1.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def methodn(P1:String, P2:Double) {
println("method 2 printhing")
println(P2)
}
我试图使用以下代码
调用上述功能df1.map( row => (row.getString(0), row.getString(1), row.getDouble(2) ) ).foreach { x =>
x._1.trim.toLowerCase match {
case "method1" => method1(x._2, x._3)
case "method2" => method2(x._2, x._3)
case _ => methodn(x._2, x._3)
}
}
基于我的JSON对象,它应该调用Method2,但是当我尝试执行上述代码时,我要低于错误。
17/11/22 16:15:44 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
java.lang.NullPointerException
at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.method2(<console>:24)
at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:40)
at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/11/22 16:15:44 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 6, localhost, executor driver): java.lang.NullPointerException
at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.method2(<console>:24)
at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:40)
at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/11/22 16:15:44 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost, executor driver): java.lang.NullPointerException
at method2(<console>:24)
at $anonfun$2.apply(<console>:40)
at $anonfun$2.apply(<console>:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
... 54 elided
Caused by: java.lang.NullPointerException
at method2(<console>:24)
at $anonfun$2.apply(<console>:40)
at $anonfun$2.apply(<console>:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
请帮助我如何解决此问题。
您正在获取NullPointerException
,因为您正在尝试在 functions ( method1, method2
)内访问 sparkSession
( spark )。那不是实际问题。主要问题是您从dataframe
的map
函数中调用这些功能。那是主要问题。
您无法访问transformations
内定义的transformations
外部定义的变量。所有功能都在transformations
内部调用,并且Spark 在这些功能中使用的spark
变量找不到任何定义。那是获得nullPointerException
的主要原因。
解决方案是调用可以访问spark
变量的函数,而不是从transformation
中访问。因此,将您的最后一个transformation
更改为action
将执行技巧
val process = df1.map( row => (row.getString(0), row.getString(1), row.getDouble(2) ) ).collect
process.foreach { x =>
x._1.trim.toLowerCase match {
case "method1" => method1(x._2, x._3)
case "method2" => method2(x._2, x._3)
case _ => methodn(x._2, x._3)
}
}
我希望答案有帮助