引发奇怪的ORC NullPointerException


当调用任何处理所有数据的操作时,我在spark中得到一个奇怪的NullPointerException。有趣的是,
val dOverallTotal = spark.read.orc("/path/to/file.orc/")
dOverallTotal.distinct.count // NPE!

失败时出现nullpointer异常,而有趣的是:

val dOverallTotal = spark.read.orc("/path/to/file.orc/partition=part_value")
dOverallTotal.distinct.count // works just fine

单独处理时,是否适用于所有可用目录/part_values?

下面的Stacktrace:

java.lang.NullPointerException
at org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getInt(OrcColumnVector.java:132)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/04/06 13:12:05 WARN TaskSetManager: Lost task 9.0 in stage 138.0 (TID 976, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getInt(OrcColumnVector.java:132)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/04/06 13:12:05 ERROR TaskSetManager: Task 9 in stage 138.0 failed 1 times; aborting job
20/04/06 13:12:05 WARN TaskSetManager: Lost task 31.0 in stage 138.0 (TID 998, localhost, executor driver): TaskKilled (Stage cancelled)
20/04/06 13:12:05 WARN TaskSetManager: Lost task 19.0 in stage 138.0 (TID 986, localhost, executor driver): TaskKilled (Stage cancelled)
20/04/06 13:12:05 WARN TaskSetManager: Lost task 16.0 in stage 138.0 (TID 983, localhost, executor driver): TaskKilled (Stage cancelled)
20/04/06 13:12:05 WARN TaskSetManager: Lost task 0.0 in stage 138.0 (TID 967, localhost, executor driver): TaskKilled (Stage cancelled)
20/04/06 13:12:05 WARN TaskSetManager: Lost task 4.0 in stage 138.0 (TID 971, localhost, executor driver): TaskKilled (Stage cancelled)
nterException
at org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getInt(OrcColumnVector.java:132)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我的spark版本是HDP 3.1上的2.3.x。

对我来说,我可以使用存储在配置单元元存储中的模式信息:

val m = spark.sessionState.catalog.getTableMetadata(TableIdentifier("table", Some("db")))
//val format = m.storage.serde.get
val format = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"
spark.read
.option("basePath", m.location.toString)
.format(format)
.schema(m.schema)
.load("/input/orc/file/path/to/directory")

我想我在模式和实际存储的内容之间存在不一致。

最新更新