如何从JSON键/值对中提取数据,如果键也具有实际值



我正在尝试使用PySpark DataFrame API解析/平坦JSON数据。挑战是,我需要从实际的键/属性中提取一个数据元素('Id'),并且还只过滤具有'statC'属性的行。首先,我试图爆炸JSON对象,但得到一个错误。有没有更好的方法来提取这些数据?

JSON输入文件:
{
"changes": {
"1": [
{
"Name": "ABC-1",
"statC": {
"newValue": 10
},
"column": {
"notDone": true,
"newStatus": "10071"
}
}
],
"2": [
{
"Name": "ABC-2",
"added": true
}
],
"3": [
{
"Name": "ABC-3",
"column": {
"notDone": true,
"newStatus": "10071"
}
}
],
"4": [
{
"Name": "ABC-4",
"statC": {
"newValue": 40
}
}
],
"5": [
{
"Name": "ABC-5",
"statC": {
"newValue": 50
},
"column": {
"notDone": false,
"done": true,
"newStatus": "13685"
}
}
],
"6": [
{
"Name": "ABC-61",
"added": true
},
{
"Name": "ABC-62",
"statC": {
"oldValue": 60
}
}
],
"7": [
{
"Name": "ABC-70",
"added": true
},
{
"Name": "ABC-71",
"statC": {
"newValue": 70
}
}    
{
"Name": "ABC-72",
"statC": {
"newValue": 75
}
}
]
},
"startTime": 1666188060000,
"endTime": 1667347140000,
"activatedTime": 1666188126953,
"now": 1667294686212
}

所需输出:

Id  Name   statC_NewValue 
1   ABC-1  10      
4   ABC-4  40     
5   ABC-5  50 
7   ABC-71 70
7   ABC-72 75

My PySpark code:

from pyspark.sql.functions import * 
rawDF = spark.read.json([f"abfss://{pADLSContainer}@{pADLSGen2}.dfs.core.windows.net/{pADLSDirectory}/InputFile.json"], multiLine = "true")
idDF = rawDF.select(explode("changes").alias("changes_json"))

错误:

AnalysisException:由于数据类型不匹配,无法解决' explosion (changes)':函数explosion的输入应该是数组或映射类型,而不是结构。

内容相当丰富。首先,您可以创建满足条件的列1,2,3等的列表,选择它们作为列,修改数组,解枢轴并展开为列。

from pyspark.sql import functions as F
df_raw = spark.read.json(["file.json"], multiLine =True)
df = df_raw.select('changes.*')
df0 = df.select([F.col(c)[0].alias(c) for c in df.columns])
cols = [c for c in df0.columns
if 'statC' in df0.schema[c].dataType.names and
'newValue' in df0.schema[c].dataType['statC'].dataType.names
]
df = df.select(
[F.transform(c, lambda x: F.struct(
x.Name.alias('Name'),
x.statC.newValue.alias('statC_NewValue'))).alias(c)
for c in cols]
)
to_melt = [f"'{c}', `{c}`" for c in df.columns]
df = df.selectExpr(f"stack({len(to_melt)}, {','.join(to_melt)}) (Id, val)")
df = df.selectExpr("Id", "inline(val)")
df = df.filter('statC_NewValue is not null')
df.show()
# +---+------+--------------+
# | Id|  Name|statC_NewValue|
# +---+------+--------------+
# |  1| ABC-1|            10|
# |  4| ABC-4|            40|
# |  5| ABC-5|            50|
# |  7|ABC-71|            70|
# |  7|ABC-72|            75|
# +---+------+--------------+

最新更新