在 PySpark 中展平动态嵌套结构(结构内结构)



我正在努力扁平化结构中包含结构的 JSON 模式。问题是内部结构名称是动态的,所以我无法使用"." 概念轻松访问它们

架构类似于:

root
|-- A: string (nullable = true)
|-- Plugins: struct (nullable = true)
|    |-- RfS: struct (nullable = true)
|        |-- A
|        |-- B
|    |-- RtW: struct (nullable = true)
|        |-- A
|        |-- B

所以 As 和 B 是固定的,但每个 JSON 文件都有不同的数字结构和不同的名称 (RfS,RtW( .. 可能是 2 .. 可能是 5 .. 带有我不知道的 dyanmic 名称。

如何以动态方式轻松地展平此结构?

下一个解决方案是使用单个选择和链函数来展平最终列:

from pyspark.sql.functions import col
from itertools import chain
jsonData = """{
"A" : "some A",
"Plugins": {
"RfS": {
"A" : "RfSA",
"B" : "RfSB"
},
"RtW" : {
"A" : "RtWA",
"B" : "RtWA"
}
}
}"""
df = spark.read.json(sc.parallelize([jsonData]))
no_plug_cols = ["A"] # cols not in Plugins i.e A
plug_df = df.select("A", "Plugins.*")
# plug_df.printSchema()
# root
#  |-- A: string (nullable = true)
#  |-- RfS: struct (nullable = true)
#  |    |-- A: string (nullable = true)
#  |    |-- B: string (nullable = true)
#  |-- RtW: struct (nullable = true)
#  |    |-- A: string (nullable = true)
#  |    |-- B: string (nullable = true)
# note that we use sets i.e set(plug_df.columns) - set(no_plug_cols) to retrieve cols in Plugins only
icols = [(col(f"{c}.A").alias(f"{c}.A"), col(f"{c}.B").alias(f"{c}.B")) 
for c in (set(plug_df.columns) - set(no_plug_cols))]
# we use chain to flatten icols which is a list of tuples
plug_df.select(no_plug_cols + list(chain(*icols))).show()
# +------+-----+-----+-----+-----+
# |     A|RfS.A|RfS.B|RtW.A|RtW.B|
# +------+-----+-----+-----+-----+
# |some A| RfSA| RfSB| RtWA| RtWA|
# +------+-----+-----+-----+-----+

最新更新