在写入文件系统时,将dataframe转换为spark中的xml会在staxxml中引发null指针异常



我正在使用基于给定的rowTag的SparkSession读取xml文件。获得的数据框需要转换为xml文件。以下是我正在尝试的代码:

val sparkSession = SparkSession.builder.master("local[*]").getOrCreate()
val xmldf = sparkSession.read.format(SEAConstant.STR_IMPORT_SPARK_DATA_BRICK_XML)
      .option(SEAConstant.STR_ROW_TAG, "Employee").option("nullValue", "").load("demo.xml")
    val columnNames = xmldf.columns.toSeq
    val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
sdf.write.format("com.databricks.spark.xml").option("rootTag", "Company")
      .option("rowTag", "Employee").save("Rel")

这是xml文件:

    <?xml version="1.0"?>
<Company>
  <Employee id="id47" masterRef="#id53" revision="" nomenclature="">
<ApplicationRef version="J.0" application="Teamcenter"></ApplicationRef>
<UserData id="id52">
<UserValue valueRef="#id4" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
<Employee id="id47" masterRef="#id53" revision="" nomenclature="">
<ApplicationRef version="B.0" application="Teamcenter"></ApplicationRef>
<UserData id="id63">
<UserValue valueRef="#id5" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
</Company>

这里的问题是,如果我只尝试3列来创建上面的sdf,则通过对xmldf中的任何3列进行手工挑选,它可以正常工作并创建xml文件。但是,如果我给出所有列,即使它们的数字为2或3,它也会失败,以下错误:

19/06/25 14:45:14 ERROR Utils: Aborting task
java.lang.NullPointerException
    at com.databricks.spark.xml.parsers.StaxXmlGenerator$$anonfun$apply$4.apply(StaxXmlGenerator.scala:131)
    at com.databricks.spark.xml.parsers.StaxXmlGenerator$$anonfun$apply$4.apply(StaxXmlGenerator.scala:129)
    at scala.collection.immutable.List.foreach(List.scala:383)
    at com.databricks.spark.xml.parsers.StaxXmlGenerator$.apply(StaxXmlGenerator.scala:129)
    at com.databricks.spark.xml.util.XmlFile$$anonfun$1$$anon$1.next(XmlFile.scala:108)
    at com.databricks.spark.xml.util.XmlFile$$anonfun$1$$anon$1.next(XmlFile.scala:96)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:363)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:125)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/06/25 14:45:14 ERROR SparkHadoopWriter: Task attempt_20190625144513_0012_m_000000_0 aborted.
19/06/25 14:45:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我抬头看着各个地方,但找不到解决方案。另外,使用上面生成的相同sdf,我能够成功创建json文件。有什么想法吗?

您的nullpointer异常无效...

使用SPARK 2.20 和下方XML依赖关系

<dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-xml_2.11</artifactId>
            <version>0.4.1</version>
        </dependency>

我保存了samedata并成功地检索了……就像以下

一样
package com.examples
import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.log4j.Level
import org.apache.spark.sql.{SQLContext, SparkSession}
/**
  * Created by Ram Ghadiyaram
  */
object SparkXmlTest {
  org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
  def main(args: Array[String]) {
    val spark = SparkSession.builder.
    master("local")
      .appName(this.getClass.getName)
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    val sc = spark.sparkContext
    val sqlContext = new SQLContext(sc)
    val str =
    """
        |<?xml version="1.0"?>
        |<Company>
        |  <Employee id="1">
        |      <Email>tp@xyz.com</Email>
        |      <UserData id="id32" type="AttributesInContext">
        |      <UserValue value="7in" title="Height"></UserValue>
        |      <UserValue value="23lb" title="Weight"></UserValue>
        |</UserData>
        |  </Employee>
        |  <Measures id="1">
        |      <Email>newdata@rty.com</Email>
        |      <UserData id="id32" type="SitesInContext">
        |</UserData>
        |  </Measures>
        |  <Employee id="2">
        |      <Email>tp@xyz.com</Email>
        |      <UserData id="id33" type="AttributesInContext">
        |      <UserValue value="7in" title="Height"></UserValue>
        |      <UserValue value="34lb" title="Weight"></UserValue>
        |</UserData>
        |  </Employee>
        |  <Measures id="2">
        |      <Email>nextrow@rty.com</Email>
        |      <UserData id="id35" type="SitesInContext">
        |</UserData>
        |  </Measures>
        |  <Employee id="3">
        |      <Email>tp@xyz.com</Email>
        |      <UserData id="id34" type="AttributesInContext">
        |      <UserValue value="7in" title="Height"></UserValue>
        |      <UserValue value="" title="Weight"></UserValue>
        |</UserData>
        |  </Employee>
        |</Company>
      """.stripMargin
    println("save to file ")
    val f = new File("xmltest.xml")
    FileUtils.writeStringToFile(f, str)

    val xmldf = spark.read.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("nullValue", "")
      .load(f.getAbsolutePath)
    val columnNames = xmldf.columns.toSeq
    val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
    sdf.write.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .mode("overwrite")
      .save("/src/main/resources/Rel1")

    println("read back from saved file ....")
    val readbackdf = spark.read.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("nullValue", "")
      .load("/src/main/resources/Rel1")
    readbackdf.show(false)
  }
}

结果:

save to file 
read back from saved file ....
+----------+------------------------------------------------------------------------------+---+
|Email     |UserData                                                                      |_id|
+----------+------------------------------------------------------------------------------+---+
|tp@xyz.com|[WrappedArray([null,Height,7in], [null,Weight,23lb]),id32,AttributesInContext]|1  |
|tp@xyz.com|[WrappedArray([null,Height,7in], [null,Weight,34lb]),id33,AttributesInContext]|2  |
|tp@xyz.com|[WrappedArray([null,Height,7in], [null,Weight,null]),id34,AttributesInContext]|3  |
+----------+------------------------------------------------------------------------------+---+

更新:使用最新的XML OP更新,我尝试并得到了例外,并使用以下代码修复了...


.option("attributePrefix", "_Att")
      .option("valueTag", "_VALUE")

databricks的完整选项和描述集:

此软件包允许将本地或分布式文件系统中的XML文件读取为SPARK DATAFREMES。读取文件时,API接受几个选项:路径:文件的位置。类似于Spark可以接受标准的Hadoop Globbing表达式。rowTag:XML文件的行标签以将其视为行。例如,在此XML ...中,适当的值将是书籍。默认值是行。目前,不支持包含自闭合XML标签的行。采样绘制:推断模式的采样比(0.0〜1(。默认值为1。排除图:是否要在元素中排除属性。默认值为false。治疗eNcertyvaluesasnulls :(弃用:使用nullvalue设置为"(是否要将灰分视为零值。默认值为false模式:解析过程中处理损坏记录的模式。默认值是允许的。允许:当它遇到损坏的记录时,它将所有字段设置为null,并将错误的字符串放入由ColumnNameOfCorrupTrecord配置的新字段中。当它遇到错误的数据类型的字段时,它将有问题的字段设置为null。DropMalformed:忽略整个损坏的记录。FAILFAST:遇到损坏的记录时会引发异常。InferSchema:如果为true,请尝试为每个结果框架列推断出适当的类型,例如布尔,数字或日期类型。如果为false,则所有结果列均为字符串类型。默认是正确的。columnNameOfCorrupTrecord:存储畸形字符串的新字段的名称。默认值为_corrupt_record。attributeprefix:属性的前缀,以便我们可以区分属性和元素。这将是字段名称的前缀。默认值为_。valuetag:当没有孩子的元素中存在属性时,用于值的标签。默认值为_value。charset:默认为" UTF-8",但可以设置为其他有效的Charset名称iNAREROURNOUNDINGSPACES:定义是否应跳过读取值的周围空格。默认值为false。编写文件时,API接受几个选项:路径:写文件的位置。rowTag:XML文件的行标签以将其视为行。例如,在此XML ...中,适当的值将是书籍。默认值是行。Roottag:XML文件的根标签以将其视为根。例如,在此XML ...中,适当的价值将是书籍。默认值是行。nullvalue:编写零值的值。默认值是字符串null。当这是空的时,它不会为字段编写属性和元素。attributeprefix:属性的前缀,以便我们可以区分属性和元素。这将是字段名称的前缀。默认值为_。valuetag:当没有孩子的元素中存在属性时,用于值的标签。默认值为_value。压缩:保存文件时要使用的压缩编解码器。应该是实现org.apache.hadoop.io.compress.compressioncodec的类完全合格的名称,或一个不敏感的缩短名称(BZIP2,GZIP,LZ4和Snappy(。当未指定编解码器时,默认为无压缩。目前,它支持缩短的名称使用情况。您可以仅使用XML代替com.databricks.spark.xml。

完整示例在这里:

package com.examples
import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{SQLContext, SparkSession}
/**
  * Created by Ram Ghadiyaram
  */
object SparkXmlTest {
  // org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
  def main(args: Array[String]) {
    val spark = SparkSession.builder.
      master("local")
      .appName(this.getClass.getName)
      .getOrCreate()
    //  spark.sparkContext.setLogLevel("ERROR")
    val sc = spark.sparkContext
    val sqlContext = new SQLContext(sc)
    //    val str =
    //    """
    //        |<?xml version="1.0"?>
    //        |<Company>
    //        |  <Employee id="1">
    //        |      <Email>tp@xyz.com</Email>
    //        |      <UserData id="id32" type="AttributesInContext">
    //        |      <UserValue value="7in" title="Height"></UserValue>
    //        |      <UserValue value="23lb" title="Weight"></UserValue>
    //        |</UserData>
    //        |  </Employee>
    //        |  <Measures id="1">
    //        |      <Email>newdata@rty.com</Email>
    //        |      <UserData id="id32" type="SitesInContext">
    //        |</UserData>
    //        |  </Measures>
    //        |  <Employee id="2">
    //        |      <Email>tp@xyz.com</Email>
    //        |      <UserData id="id33" type="AttributesInContext">
    //        |      <UserValue value="7in" title="Height"></UserValue>
    //        |      <UserValue value="34lb" title="Weight"></UserValue>
    //        |</UserData>
    //        |  </Employee>
    //        |  <Measures id="2">
    //        |      <Email>nextrow@rty.com</Email>
    //        |      <UserData id="id35" type="SitesInContext">
    //        |</UserData>
    //        |  </Measures>
    //        |  <Employee id="3">
    //        |      <Email>tp@xyz.com</Email>
    //        |      <UserData id="id34" type="AttributesInContext">
    //        |      <UserValue value="7in" title="Height"></UserValue>
    //        |      <UserValue value="" title="Weight"></UserValue>
    //        |</UserData>
    //        |  </Employee>
    //        |</Company>
    //      """.stripMargin
    val str =
    """
      |<Company>
      |  <Employee id="id47" masterRef="#id53" revision="" nomenclature="">
      |<ApplicationRef version="J.0" application="Teamcenter"></ApplicationRef>
      |<UserData id="id52">
      |<UserValue valueRef="#id4" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
      |<Employee id="id47" masterRef="#id53" revision="" nomenclature="">
      |<ApplicationRef version="B.0" application="Teamcenter"></ApplicationRef>
      |<UserData id="id63">
      |<UserValue valueRef="#id5" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
      |</Company>
    """.stripMargin
    println("save to file ")
    val f = new File("xmltest.xml")
    FileUtils.writeStringToFile(f, str)

    val xmldf = spark.read.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("nullValue", "")
      .load(f.getAbsolutePath)
    val columnNames = xmldf.columns.toSeq
    val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
    sdf.write.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("attributePrefix", "_Att")
      .option("valueTag", "_VALUE")
      .mode("overwrite")
      .save("./src/main/resources/Rel1")

    println("read back from saved file ....")
    val readbackdf = spark.read.format("com.databricks.spark.xml")
      .option("rootTag", "Company")
      .option("rowTag", "Employee")
      .option("nullValue", "")
      .load("./src/main/resources/Rel1")
    readbackdf.show(false)
  }
}

结果:

save to file 
read back from saved file ....
+-----------------+-------------------------------+----+----------+
|ApplicationRef   |UserData                       |_id |_masterRef|
+-----------------+-------------------------------+----+----------+
|[Teamcenter, J.0]|[[_CONFIG_CONTEXT, #id4], id52]|id47|#id53     |
|[Teamcenter, B.0]|[[_CONFIG_CONTEXT, #id5], id63]|id47|#id53     |
+-----------------+-------------------------------+----+----------+
xmldf.write.format("com.databricks.spark.xml").option("rootTag", "Company")
     .option("rowTag", "Employee").option("attributePrefix", "_Att")
     .option("valueTag","_VALUE").save("Rel")

由此替换OP中的相应语句。StaxParser实际上正在寻找这些attributePrefixvalueTag,否则它将抛出NPE。我在查看此github链接时发现了这个

相关内容

最新更新