我有一个包含 3 列的数据帧,其架构类似于以下内容:
org.apache.spark.sql.types.StructType = StructType(StructField(UUID,StringType,true), StructField(NAME,StringType,true), StructField(DOCUMENT,ArrayType(MapType(StringType,StringType,true),true),true))
这可能是此数据框中的行示例:
org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01))]
在此数据帧的最后一列上应用 udf 函数后,我正在生成一个新列。那个是和数组>
这是我正在应用的代码:
def number_length( num:String ) : String = { if(num.length < 6) "000000" else num }
def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => {
inputSeq.map(x => Map("source" -> x("source"),"document_number" -> number_length(x("document_number")),"first_seen"-> x("first_seen"))))
})
val newDF = DF.withColumn("VALID_DOCUMENT", validating_doc($"DOCUMENT"))
在此之后一切正常,我可以执行一些操作,例如 show 和 first
,返回:
org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01)),WrappedArray(Map(source -> central, document_number -> 000000, first_seen -> 2018-05-01))]
但是,如果我尝试将这个数据帧编写为 avro,则这样做:
newDF.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("hdfs:///data/mypath")
我收到以下错误:
WARN scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 6, myserver.azure.com): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:272)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:52)
at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:51)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)
但是,如果我删除此新列,则可以写入数据帧。
写入数据帧时缺少什么? UDF 是否更改了架构中我不知道的内容?
您的代码在UDF
调用中给出 NPE。您使用的函数不是null
安全的,如果出现以下情况,它将失败:
-
inputSeq
null
. inputSeq
的任何元素都是null
.- 任何
document_number
数都null
在inputSeq
的任何元素中null
。
如果缺少任何项目,它也会失败(尽管这里不是问题。您必须包括适当的检查,从这样的东西开始(未经测试(:
def number_length( num:String ) : String = num match {
case null => null
case _ => if(num.length < 6) "000000" else num
}
def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => inputSeq match {
case null => null
case xs => xs.map {
case null => null
case x => Map(
"source" -> x("source"),
"document_number" -> number_length(x("document_number")),
"first_seen" -> x("first_seen")
)
}
})
为什么在 write.mode(SaveMode.Overwrite( 时获得 NPE,即使数据帧允许其他操作作为第一个或显示?
因为first
和show
都只评估数据的子集,并且显然不会遇到有问题的行。