在PysparkSQL中分解JSON



我想要将嵌套的json分解为CSV文件。希望将嵌套的json解析为行和列。

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row
df=spark.read.option("multiline","true").json("sample1.json")
df.printSchema()
root
|-- pid: struct (nullable = true)
|    |-- Body: struct (nullable = true)
|    |    |-- Vendor: struct (nullable = true)
|    |    |    |-- RC: struct (nullable = true)
|    |    |    |    |-- Updated_From_Date: string (nullable = true)
|    |    |    |    |-- Updated_To_Date: string (nullable = true)
|    |    |    |-- RD: struct (nullable = true)
|    |    |    |    |-- Supplier: struct (nullable = true)
|    |    |    |    |    |-- Supplier_Data: struct (nullable = true)
|    |    |    |    |    |    |-- Days: long (nullable = true)
|    |    |    |    |    |    |-- Reference: struct (nullable = true)
|    |    |    |    |    |    |    |-- ID: array (nullable = true)
|    |    |    |    |    |    |    |    |-- element: string (containsNull = true)
|    |    |    |    |    |    |-- Expected: long (nullable = true)
|    |    |    |    |    |    |-- Payments: long (nullable = true)
|    |    |    |    |    |    |-- Approval: struct (nullable = true)
|    |    |    |    |    |    |    |-- ID: array (nullable = true)
|    |    |    |    |    |    |    |    |-- element: string (containsNull = true)
|    |    |    |    |    |    |-- Areas_Changed: struct (nullable = true)
|    |    |    |    |    |    |    |-- Alternate_Names: long (nullable = true)
|    |    |    |    |    |    |    |-- Attachments: long (nullable = true)
|    |    |    |    |    |    |    |-- Classifications: long (nullable = true)
|    |    |    |    |    |    |    |-- Contact_Information: long (nullable = true)

我的代码:

df2=(df.select(F.explode("pid").alias('pid'))
.select('pid.*')
.select(F.explode('Body').alias('Body'))
.select('Body.*')
.select((F.explode('Vendor').alias('Vendor'))
.select('Vendor.*')
.select((F.explode('RC').alias('RC'))
.select('RC.*'))))

错误:AnalysisException:由于数据类型不匹配,无法解析"爆炸(pid(":函数爆炸的输入应为数组或映射类型,而不是结构<正文:struct<

如何解析为结构字段。任何帮助都将不胜感激:(

只能在映射或数组类型上使用explode函数。要访问strcut类型,只需使用.运算符。

假设您想要获得RC和RD下的列,那么代码语法应该如下所示。

df.select("pid.Body.Vendor.RC.*", "pid.Body.Vendor.RD.*")

最新更新