我正在尝试用pyspark解析多个xml文件。所有xml文件都具有相同的已知模式
首先,我将所有文件加载为文本,以激发DF:
path = 'c:\path\to\xml\files\*.xml'
df = spark.read.text(path)
在这一点上,我的DF看起来是这样的:
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| value
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>|
|<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
xml文件模式如下:
df.printSchema()
root
|-- Header: struct (nullable = false)
| |-- tag1: string (nullable = false)
| |-- tag2: integer (nullable = false)
| |-- tag3: timestamp (nullable = false)
|-- Body: struct (nullable = false)
| |-- Pair: array (nullable = false)
| | |-- element: struct (containsNull = true)
| | | |-- N: string (nullable = false)
| | | |-- V: string (nullable = false)
所以解析后的最终输出应该是这样的:
+---------+-----+------------------------+---+--+
|tag1 | tag2| tag3 | N |V |
+---------+-----+------------------------+---+--+
|some str1| 2 |2022-02-16 10:39:26.730 |N1 |V1|
|some str1| 2 |2022-02-16 10:39:26.730 |N2 |V2|
|some str1| 2 |2022-02-16 10:39:26.730 |N3 |V3|
|some str2| 5 |2022-02-17 10:39:26.730 |N4 |V4|
|some str2| 5 |2022-02-17 10:39:26.730 |N5 |V5|
+---------+-----+------------------------+---+--+
意思是";标题";元素应该为来自同一xml字符串的所有NV对重复自身
所以我想我找到了一种用xpath
或xml.etree.ElementTree
提取所有头标签的方法,但我的问题是,我真的不知道如何将NV对提取到以后可以爆炸的东西中。
我错过了什么?
----澄清----
我试图用加载我的xml文件
path = 'c:\path\to\xml\files\*.xml'
df = spark.read.format('xml').option('rowTag','Msg').schema(schema).load(path)
但此选项无法提供*.xml
路径,所以这就是我将文件读取为文本的原因。
尝试一下:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.format('xml').options(rowTag='book').load('books.xml')
请参阅https://github.com/databricks/spark-xml#python-api
根据您的spark版本,您必须将其添加到环境中。我使用的是spark 2.4.0,这个版本对我有效。databricks xml版本
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.7.0 pyspark-shell'
input_path.xml如下所示:
<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>
<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>
input_path = 'src/input/input.xml'
xmlDF = spark.read.format('xml').option('rowTag', 'Msg').load(input_path)
xmlDF.printSchema()
root
|-- Body: struct (nullable = true)
| |-- Pair: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- N: string (nullable = true)
| | | |-- V: string (nullable = true)
|-- Header: struct (nullable = true)
| |-- tag1: string (nullable = true)
| |-- tag2: long (nullable = true)
| |-- tag3: timestamp (nullable = true)
由于不能在同一个查询中分解两个列表,因此可以这样划分:
xmlDF.select(
'*',
explode("Body.Pair.N").alias('N')
).select(
'N',
explode("Body.Pair.V").alias('V'),
col("Header.tag1").alias('tag1'),
col("Header.tag2").alias('tag2'),
col("Header.tag3").alias('tag3'),
)
.dropDuplicates()
.show(truncate=False)
它将根据您的输入给出以下结果:
+---+---+---------+----+----------------------+
|N |V |tag1 |tag2|tag3 |
+---+---+---------+----+----------------------+
|N2 |V1 |some str1|2 |2022-02-16 10:39:26.73|
|N4 |V5 |some str2|5 |2022-02-17 10:39:26.73|
|N1 |V3 |some str1|2 |2022-02-16 10:39:26.73|
|N5 |V5 |some str2|5 |2022-02-17 10:39:26.73|
|N5 |V4 |some str2|5 |2022-02-17 10:39:26.73|
|N4 |V4 |some str2|5 |2022-02-17 10:39:26.73|
|N1 |V1 |some str1|2 |2022-02-16 10:39:26.73|
|N3 |V3 |some str1|2 |2022-02-16 10:39:26.73|
|N2 |V2 |some str1|2 |2022-02-16 10:39:26.73|
|N3 |V2 |some str1|2 |2022-02-16 10:39:26.73|
|N1 |V2 |some str1|2 |2022-02-16 10:39:26.73|
|N3 |V1 |some str1|2 |2022-02-16 10:39:26.73|
|N2 |V3 |some str1|2 |2022-02-16 10:39:26.73|
+---+---+---------+----+----------------------+
它似乎与我的本地spark版本有关。我已经更新了它,还将HADOOP_HOME添加到了我的PATH中。所以现在:
path = 'c:\path\to\xml\files\*.xml'
df = spark.read.format('xml').option('rowTag','Msg').schema(schema).load(path)
工作完美!