使用 Spark SQL 将数据帧内容另存为 CSV 文件时,Spark 作业失败并出现异常



我正在尝试将数据帧内容以csv格式保存到hdfs。我能够用小文件来做到这一点。在尝试处理更多数量的文件(90 +文件)时,我得到NullPointerException并且作业失败。下面是我的代码:

val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "false").option("delimiter", "|").load(hdfs path for loading multiple files/*");
val mydateFunc = udf {(x: String) => x.split("/") match {case Array(month,date,year) => year+"-"+month+"-"+date case Array(y)=> y}}
val df2 = df1.withColumn("orderdate", mydateFunc(df1("Date on which the record was created"))).drop("Date on which the record was created")
val df3 = df2.withColumn("deliverydate", mydateFunc(df2("Requested delivery date"))).drop("Requested delivery date")
val exp = "(.*)(44000\d{5}|69499\d{6})(.*)".r

val upc_extractor: (String => String) = (arg: String) => arg match { case exp(pref,required,suffx) => required case x:String => x }
val sqlfunc = udf(upc_extractor)
val df4 = df3.withColumn("formatted_UPC", sqlfunc(col("European Article Numbers/Universal Produ")))
df4.write.format("com.databricks.spark.csv").option("header", "false").save("destination path in hdfs to save the resultant files");

以下是我得到的异常:

16/02/03 01:59:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/02/03 01:59:33 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
java.lang.NullPointerException
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:165)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:158)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/02/03 01:59:33 INFO TaskSetManager: Starting task 32.0 in stage 1.0 (TID 33, localhost, ANY, 1692 bytes)
16/02/03 01:59:33 INFO Executor: Running task 32.0 in stage 1.0 (TID 33)
16/02/03 01:59:33 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 3, localhost): java.lang.NullPointerException
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:30)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:165)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$9$$anon$1.next(package.scala:158)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/02/03 01:59:33 ERROR TaskSetManager: Task 2 in stage 1.0 failed 1 times; aborting job
16/02/03 01:59:33 INFO TaskSchedulerImpl: Cancelling stage 1
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 29.0 in stage 1.0 (TID 30)
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 8.0 in stage 1.0 (TID 9)
16/02/03 01:59:33 INFO TaskSchedulerImpl: Stage 1 was cancelled
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 0.0 in stage 1.0 (TID 1)

Spark版本是1.4.1。任何帮助都非常感谢。

可能您的一个文件输入错误。要做的第一件事是找到文件。找到它后,您可以尝试找到导致问题的行。当你拿到这条线时,仔细看看它,也许你会发现问题所在。我的猜测是列数与预期不符。也许有些东西没有正确转义。如果找不到,您仍可以通过添加文件内容来更新问题。

将if条件添加到udf mydateFunc过滤导致NPE的空值后,代码工作正常。我能够加载所有文件。

val mydateFunc = udf {(x: String) => if(x ==null) x else x.split("/") match {case Array(month,date,year) => year+"-"+month+"-"+date case Array(y)=> y}}

最新更新