Spark - 为什么我在 write.mode(SaveMode.Overwrite) 时获得 NPE,即使数据帧允许



我有一个包含 3 列的数据帧,其架构类似于以下内容:

org.apache.spark.sql.types.StructType = StructType(StructField(UUID,StringType,true), StructField(NAME,StringType,true), StructField(DOCUMENT,ArrayType(MapType(StringType,StringType,true),true),true))

这可能是此数据框中的行示例:

org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01))]

在此数据帧的最后一列上应用 udf 函数后,我正在生成一个新列。那个是和数组>

这是我正在应用的代码:

def number_length( num:String ) : String = { if(num.length < 6) "000000" else num }
def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => {
  inputSeq.map(x => Map("source" -> x("source"),"document_number" -> number_length(x("document_number")),"first_seen"-> x("first_seen"))))
})
val newDF = DF.withColumn("VALID_DOCUMENT", validating_doc($"DOCUMENT"))

在此之后一切正常,我可以执行一些操作,例如 show 和 first ,返回:

org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01)),WrappedArray(Map(source -> central, document_number -> 000000, first_seen -> 2018-05-01))]

但是,如果我尝试将这个数据帧编写为 avro,则这样做:

newDF.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("hdfs:///data/mypath")

我收到以下错误:

WARN scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 6, myserver.azure.com): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:272)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:52)
        at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:51)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
        at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)

但是,如果我删除此新列,则可以写入数据帧。

写入数据帧时缺少什么? UDF 是否更改了架构中我不知道的内容?

您的代码在UDF调用中给出 NPE。您使用的函数不是null安全的,如果出现以下情况,它将失败:

  • inputSeq null.
  • inputSeq的任何元素都是null .
  • 任何document_number数都nullinputSeq的任何元素中null

如果缺少任何项目,它也会失败(尽管这里不是问题。您必须包括适当的检查,从这样的东西开始(未经测试(:

def number_length( num:String ) : String = num match { 
  case null => null
  case _ => if(num.length < 6) "000000" else num 
}

def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => inputSeq match {
  case null => null
  case xs => xs.map {
    case null => null
    case x => Map(
      "source" -> x("source"),
      "document_number" ->  number_length(x("document_number")),
      "first_seen" -> x("first_seen")
    )
  }
})

为什么在 write.mode(SaveMode.Overwrite( 时获得 NPE,即使数据帧允许其他操作作为第一个或显示?

因为firstshow都只评估数据的子集,并且显然不会遇到有问题的行。

最新更新