我需要打开存储在Azure Datalake Store上的多个XML文件的内容并将其复制到Azure SQL DB中。这是 XML 文件结构:
<?xml version="1.0" encoding="utf-8"?>
<FileSummary xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="invoices.xsd">
<Header>
<SequenceNumber>1</SequenceNumber>
<Description>Hello</Description>
<ShipDate>20180101</ShipDate>
</Header>
<FileInvoices>
<InvoiceNumber>000000A</InvoiceNumber>
<InvoiceHeader>
<InvoiceHeaderDate>201800201</InvoiceHeaderDate>
<InvoiceHeaderDescription>XYZ</InvoiceHeaderDescription>
</InvoiceHeader>
<InvoiceItems>
<ItemId>000001</ItemId>
<ItemQuantity>000010</ItemQuantity>
<ItemPrice>000100</ItemPrice>
</InvoiceItems>
</FileInvoices>
<FileInvoices>
<InvoiceNumber>000000B</InvoiceNumber>
<InvoiceHeader>
<InvoiceHeaderDate>201800301</InvoiceHeaderDate>
<InvoiceHeaderDescription>ABC</InvoiceHeaderDescription>
</InvoiceHeader>
<InvoiceItems>
<ItemId>000002</ItemId>
<ItemQuantity>000020</ItemQuantity>
<ItemPrice>000200</ItemPrice>
</InvoiceItems>
</FileInvoices>
</FileSummary>
因此,我使用 Azure Databricks 将 Datalake 存储挂载为"/mnt/testdata",然后尝试使用以下命令打开上面的示例文件
dfXml = (sqlContext.read.format("xml") # requires maven library <HyukjinKwon:spark-xml:0.1.1-s_2.11>
.options(rootTag='FileSummary')
.load('/mnt/testdata/data/invoices_file1.xml'))
dfXml.cache()
print ("Number of records in this dataframe: " + str(dfXml.count()))
dfXml.printSchema()
返回以下结果:
dfXml:pyspark.sql.dataframe.DataFrame
FileInvoices:array
element:struct
InvoiceHeader:struct
InvoiceHeaderDate:long
InvoiceHeaderDescription:string
InvoiceItems:struct
ItemId:long
ItemPrice:long
ItemQuantity:long
InvoiceNumber:string
Header:struct
Description:string
SequenceNumber:long
ShipDate:long
xmlns:xsi:string
xsi:noNamespaceSchemaLocation:string
Number of records in this dataframe: 1
root
|-- FileInvoices: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- InvoiceHeader: struct (nullable = true)
| | | |-- InvoiceHeaderDate: long (nullable = true)
| | | |-- InvoiceHeaderDescription: string (nullable = true)
| | |-- InvoiceItems: struct (nullable = true)
| | | |-- ItemId: long (nullable = true)
| | | |-- ItemPrice: long (nullable = true)
| | | |-- ItemQuantity: long (nullable = true)
| | |-- InvoiceNumber: string (nullable = true)
|-- Header: struct (nullable = true)
| |-- Description: string (nullable = true)
| |-- SequenceNumber: long (nullable = true)
| |-- ShipDate: long (nullable = true)
|-- xmlns:xsi: string (nullable = true)
|-- xsi:noNamespaceSchemaLocation: string (nullable = true)
因此,看起来上面的命令确实正确读取了文件,当然,我能够连接到规范化的Azure SQL DB并将记录写入特定表中:
dfXml.write.jdbc(url=jdbcUrl, table="dest_table", mode="overwrite", properties=connectionProperties)
,但是这种方法需要设置一些嵌套循环和大量手动任务来跟踪每个表的键并尊重不利用 Spark 架构的引用完整性,所以我现在想知道是否有最佳实践(或预构建库)以更自动化和可扩展的方式完成此任务。
我希望这是一个常见的需求,所以理想情况下我会使用一个库来读取开头显示的完整 XML 结构,并自动提取信息以插入到规范化表中。
非常感谢您的任何建议。
毛罗
取决于您尝试执行的操作以及表结构的外观。我假设您正在尝试使用 Spark 处理许多文件。并且还想将数据加载到不同的规范化表中
例如,您可能希望将标题写入一个表中,标头>文件发票是一对多关系,因此这可能是另一个表。
-
当您使用 load(文件名*.xml)读取多个 xml 文件时,您 想要将文件摘要作为行标签。然后你会有多个 数据帧中的行,每个文件摘要一个。
-
您可以选择标题列到另一个数据帧中,并将其写入 到表。
-
文件发票是 struc 数组,您可以将它们分解成行 并将它们存储到另一个表中。
-
此外,如果每张发票可以包含多个项目,您可以执行另一个 分解以使它们成行并将它们存储到另一个表中
或者,您可以执行两次分解并将生成的数据帧加载到一个大型非规范化表中。
这是一篇关于爆炸如何工作的文章 https://hadoopist.wordpress.com/2016/05/16/how-to-handle-nested-dataarray-of-structures-or-multiple-explodes-in-sparkscala-and-pyspark/
我正在使用 spark-shell 在下面执行。我相信 xml 结构正在重演。 您需要创建/引用一个与 xml 文件相关的架构。 你可以利用砖房udf罐子。 然后
1.创建如下函数
sql(""" create temporary function numeric_range as brickhouse.udf.collect.NumericRange""")
2.使用架构
var df=sqlContext.read.format("com.databricks.spark.xml").option("rowTag","FileSummary").load("location of schema file")
val schema=df.schema
3.var df1=sqlContext.read.format("com.databricks.spark.xml").option("rowTag","FileSummary").schema(schema).load("location of actual xml file")
df1.registerTempTable("XML_Data")
4.您需要按如下方式展平文件发票
val df2=sql("select array_index(FileInvoices,n) as FileInvoices from XML_Data lateral view numeric_range(size(FileInvoices))n1 as n""").registerTempTable("xmlData2")
一旦每个都转换为结构,就更容易遍历或使用爆炸使用FileInvoices.InvoiceHeader.InvoiceHeaderDate
val jdbcUsername = "<username>"
val jdbcPassword = "<password>"
val jdbcHostname = "<hostname>" //typically, this is in the form or servername.database.windows.net
val jdbcPort = 1433
val jdbcDatabase ="<database>"
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
spark.table("").write.jdbc(jdbc_url, "xmlData2", connectionProperties)
谢谢苏巴什,阿南德。 关于Subash的答案,我没有模式文件,所以我修改了他的step2,将"实际xml文件的位置"替换为"实际xml文件的位置",它实际上有效:在步骤3之后,如果我只是运行
df2=sql("select * from XML_Data")
然后我跑
from pyspark.sql.functions import explode
df3=df2.withColumn("FileInvoices", explode(df2.FileInvoices))
display(df3)
因此,它跨多行复制 Header 的相同单个结构,其中在 FileInvoices 列中,我有一个不同的发票结构: 分解文件发票
因此,看起来我离最终目标越来越近了,但是我仍然错过了以正确的顺序自动创建记录以避免破坏引用完整性。
但在此之前,感谢您的反馈。
再次感谢,
毛罗