我在两个DataFrames
之间做一些操作:
val resultRdd = df1.join(df2, df1("column")===df2("column") &&
df1("column2").contains(df2("column")), "left_outer").rdd
resultRdd.map { t => ... }
但是我每次都收到此错误:编辑*
Job aborted due to stage failure: Task 114 in stage 40.0 failed 4 times, most recent failure: Lost task 114.3 in stage 40.0 (TID 908, 10.10.10.51, executor 1): java.lang.NullPointerException
at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
由于此错误,我尝试打印
我正在研究并在其他问题中读到,执行者可能无法访问DataFrame
而不是驱动程序。Spark RDD 映射中的 NullPointerException 作为 Spark 作业提交时
我已经尝试了coalesce()
和collect()
,但对我不起作用。
我不知道如何处理这个问题,有什么帮助吗?
我正在使用 Spark 2.1.0
已编辑2*******
调试后我想我发现了一些线索:
我让我的应用程序在服务器中运行,如果我执行一次它工作,但如果我再次执行它会失败。如果我重新启动服务器并且SparkContext被创建为新的它再次工作。
日志错误为:
17/01/25 16:10:40 INFO TaskSetManager: Starting task 86.0 in stage 126.0 (TID 5249, localhost, executor driver, partition 86, ANY, 6678 bytes)
17/01/25 16:10:40 INFO TaskSetManager: Finished task 43.0 in stage 126.0 (TID 5248) in 6 ms on localhost (executor driver) (197/200)
17/01/25 16:10:40 INFO Executor: Running task 86.0 in stage 126.0 (TID 5249)
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 96 non-empty blocks out of 200 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 4 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 ERROR Executor: Exception in task 86.0 in stage 126.0 (TID 5249)
java.lang.NullPointerException
at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$class.foreach(Iterator.scala:750)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
at scala.collection.AbstractIterator.to(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/25 16:10:40 INFO TaskSetManager: Starting task 114.0 in stage 126.0 (TID 5250, localhost, executor driver, partition 114, ANY, 6678 bytes)
17/01/25 16:10:40 INFO Executor: Running task 114.0 in stage 126.0 (TID 5250)
17/01/25 16:10:40 WARN TaskSetManager: Lost task 86.0 in stage 126.0 (TID 5249, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$class.foreach(Iterator.scala:750)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
at scala.collection.AbstractIterator.to(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 4 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 ERROR TaskSetManager: Task 86 in stage 126.0 failed 1 times; aborting job
17/01/25 16:10:40 INFO Executor: Finished task 114.0 in stage 126.0 (TID 5250). 3800 bytes result sent to driver
17/01/25 16:10:40 INFO TaskSetManager: Finished task 114.0 in stage 126.0 (TID 5250) in 15 ms on localhost (executor driver) (198/200)
17/01/25 16:10:40 INFO TaskSchedulerImpl: Removed TaskSet 126.0, whose tasks have all completed, from pool
17/01/25 16:10:40 INFO TaskSchedulerImpl: Cancelling stage 126
17/01/25 16:10:40 INFO DAGScheduler: ResultStage 126 (collect at CategorizationSystem.scala:123) failed in 1.664 s due to Job aborted due to stage failure: Task 86 in stage 126.0 failed 1 times, most recent failure: Lost task 86.0 in stage 126.0 (TID 5249, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
at scala.collection.Iterator$class.foreach(Iterator.scala:750)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
at scala.collection.AbstractIterator.to(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
17/01/25 16:10:40 INFO DAGScheduler: Job 19 failed: collect at CategorizationSystem.scala:123, took 8.517397 s
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116937
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116938
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116939
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116940
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116941
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116942
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116943
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116944
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116945
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 16
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117330
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117331
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117332
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117333
17/01/25 16:10:41 INFO BlockManagerInfo: Removed broadcast_36_piece0 on 192.168.80.136:35004 in memory (size: 20.6 KB, free: 613.8 MB)
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117334
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117335
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117336
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117337
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117338
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117339
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117340
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 17
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117341
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117342
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117343
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117344
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117345
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 18
17/01/25 16:10:41 INFO BlockManagerInfo: Removed broadcast_39_piece0 on 192.168.80.136:35004 in memory (size: 23.0 KB, free: 613.8 MB)
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102054
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102055
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102056
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102057
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102058
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102059
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102060
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102061
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 11
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102062
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102063
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102064
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102065
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102066
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 12
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 15
有什么帮助吗?
我解决了删除代码某些部分中的 df1.cache 的问题。我不知道为什么这可以解决问题,但它有效。