如何使用pyspark解析xml字符串列



我正在尝试用pyspark解析多个xml文件。所有xml文件都具有相同的已知模式
首先,我将所有文件加载为文本,以激发DF:

path = 'c:\path\to\xml\files\*.xml'
df = spark.read.text(path)

在这一点上,我的DF看起来是这样的:

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| value                                                                                     
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>|
|<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

xml文件模式如下:

df.printSchema()
root
|-- Header: struct (nullable = false)
|    |-- tag1: string (nullable = false)
|    |-- tag2: integer (nullable = false)
|    |-- tag3: timestamp (nullable = false)
|-- Body: struct (nullable = false)
|    |-- Pair: array (nullable = false)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- N: string (nullable = false)
|    |    |    |-- V: string (nullable = false)

所以解析后的最终输出应该是这样的:

+---------+-----+------------------------+---+--+
|tag1     | tag2| tag3                   | N |V |
+---------+-----+------------------------+---+--+
|some str1| 2   |2022-02-16 10:39:26.730 |N1 |V1|
|some str1| 2   |2022-02-16 10:39:26.730 |N2 |V2|
|some str1| 2   |2022-02-16 10:39:26.730 |N3 |V3|
|some str2| 5   |2022-02-17 10:39:26.730 |N4 |V4|
|some str2| 5   |2022-02-17 10:39:26.730 |N5 |V5|
+---------+-----+------------------------+---+--+

意思是";标题";元素应该为来自同一xml字符串的所有NV对重复自身
所以我想我找到了一种用xpathxml.etree.ElementTree提取所有头标签的方法,但我的问题是,我真的不知道如何将NV对提取到以后可以爆炸的东西中。

我错过了什么?

----澄清----
我试图用加载我的xml文件

path = 'c:\path\to\xml\files\*.xml'
df = spark.read.format('xml').option('rowTag','Msg').schema(schema).load(path)

但此选项无法提供*.xml路径,所以这就是我将文件读取为文本的原因。

尝试一下:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.format('xml').options(rowTag='book').load('books.xml')

请参阅https://github.com/databricks/spark-xml#python-api

根据您的spark版本,您必须将其添加到环境中。我使用的是spark 2.4.0,这个版本对我有效。databricks xml版本

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.7.0 pyspark-shell'

input_path.xml如下所示:

<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>
<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>
input_path = 'src/input/input.xml'
xmlDF = spark.read.format('xml').option('rowTag', 'Msg').load(input_path)
xmlDF.printSchema()
root
|-- Body: struct (nullable = true)
|    |-- Pair: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- N: string (nullable = true)
|    |    |    |-- V: string (nullable = true)
|-- Header: struct (nullable = true)
|    |-- tag1: string (nullable = true)
|    |-- tag2: long (nullable = true)
|    |-- tag3: timestamp (nullable = true)

由于不能在同一个查询中分解两个列表,因此可以这样划分:

xmlDF.select(
'*',
explode("Body.Pair.N").alias('N')
).select(
'N',
explode("Body.Pair.V").alias('V'),
col("Header.tag1").alias('tag1'),
col("Header.tag2").alias('tag2'),
col("Header.tag3").alias('tag3'),
) 
.dropDuplicates() 
.show(truncate=False)

它将根据您的输入给出以下结果:

+---+---+---------+----+----------------------+
|N  |V  |tag1     |tag2|tag3                  |
+---+---+---------+----+----------------------+
|N2 |V1 |some str1|2   |2022-02-16 10:39:26.73|
|N4 |V5 |some str2|5   |2022-02-17 10:39:26.73|
|N1 |V3 |some str1|2   |2022-02-16 10:39:26.73|
|N5 |V5 |some str2|5   |2022-02-17 10:39:26.73|
|N5 |V4 |some str2|5   |2022-02-17 10:39:26.73|
|N4 |V4 |some str2|5   |2022-02-17 10:39:26.73|
|N1 |V1 |some str1|2   |2022-02-16 10:39:26.73|
|N3 |V3 |some str1|2   |2022-02-16 10:39:26.73|
|N2 |V2 |some str1|2   |2022-02-16 10:39:26.73|
|N3 |V2 |some str1|2   |2022-02-16 10:39:26.73|
|N1 |V2 |some str1|2   |2022-02-16 10:39:26.73|
|N3 |V1 |some str1|2   |2022-02-16 10:39:26.73|
|N2 |V3 |some str1|2   |2022-02-16 10:39:26.73|
+---+---+---------+----+----------------------+

它似乎与我的本地spark版本有关。我已经更新了它,还将HADOOP_HOME添加到了我的PATH中。所以现在:

path = 'c:\path\to\xml\files\*.xml'
df = spark.read.format('xml').option('rowTag','Msg').schema(schema).load(path)

工作完美!

相关内容

  • 没有找到相关文章

最新更新