我正在尝试使用PySpark DataFrame API解析/平坦JSON数据。挑战是,我需要从实际的键/属性中提取一个数据元素('Id'),并且还只过滤具有'statC'属性的行。首先,我试图爆炸JSON对象,但得到一个错误。有没有更好的方法来提取这些数据?
JSON输入文件:{
"changes": {
"1": [
{
"Name": "ABC-1",
"statC": {
"newValue": 10
},
"column": {
"notDone": true,
"newStatus": "10071"
}
}
],
"2": [
{
"Name": "ABC-2",
"added": true
}
],
"3": [
{
"Name": "ABC-3",
"column": {
"notDone": true,
"newStatus": "10071"
}
}
],
"4": [
{
"Name": "ABC-4",
"statC": {
"newValue": 40
}
}
],
"5": [
{
"Name": "ABC-5",
"statC": {
"newValue": 50
},
"column": {
"notDone": false,
"done": true,
"newStatus": "13685"
}
}
],
"6": [
{
"Name": "ABC-61",
"added": true
},
{
"Name": "ABC-62",
"statC": {
"oldValue": 60
}
}
],
"7": [
{
"Name": "ABC-70",
"added": true
},
{
"Name": "ABC-71",
"statC": {
"newValue": 70
}
}
{
"Name": "ABC-72",
"statC": {
"newValue": 75
}
}
]
},
"startTime": 1666188060000,
"endTime": 1667347140000,
"activatedTime": 1666188126953,
"now": 1667294686212
}
所需输出:
Id Name statC_NewValue
1 ABC-1 10
4 ABC-4 40
5 ABC-5 50
7 ABC-71 70
7 ABC-72 75
My PySpark code:
from pyspark.sql.functions import *
rawDF = spark.read.json([f"abfss://{pADLSContainer}@{pADLSGen2}.dfs.core.windows.net/{pADLSDirectory}/InputFile.json"], multiLine = "true")
idDF = rawDF.select(explode("changes").alias("changes_json"))
错误:
AnalysisException:由于数据类型不匹配,无法解决' explosion (
changes
)':函数explosion的输入应该是数组或映射类型,而不是结构。
内容相当丰富。首先,您可以创建满足条件的列1
,2
,3
等的列表,选择它们作为列,修改数组,解枢轴并展开为列。
from pyspark.sql import functions as F
df_raw = spark.read.json(["file.json"], multiLine =True)
df = df_raw.select('changes.*')
df0 = df.select([F.col(c)[0].alias(c) for c in df.columns])
cols = [c for c in df0.columns
if 'statC' in df0.schema[c].dataType.names and
'newValue' in df0.schema[c].dataType['statC'].dataType.names
]
df = df.select(
[F.transform(c, lambda x: F.struct(
x.Name.alias('Name'),
x.statC.newValue.alias('statC_NewValue'))).alias(c)
for c in cols]
)
to_melt = [f"'{c}', `{c}`" for c in df.columns]
df = df.selectExpr(f"stack({len(to_melt)}, {','.join(to_melt)}) (Id, val)")
df = df.selectExpr("Id", "inline(val)")
df = df.filter('statC_NewValue is not null')
df.show()
# +---+------+--------------+
# | Id| Name|statC_NewValue|
# +---+------+--------------+
# | 1| ABC-1| 10|
# | 4| ABC-4| 40|
# | 5| ABC-5| 50|
# | 7|ABC-71| 70|
# | 7|ABC-72| 75|
# +---+------+--------------+