Pyspark将嵌套JSON分解成多列和多行



我是Pyspark的新手,还不熟悉它提供的所有函数和功能。

我有一个PySpark Dataframe的列,其中包含嵌套JSON值,例如:

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
rows = [['Alice', """{
"level1":{
"tag1":{
"key1":"value1",
"key2":"value2",
"key3":"value3",
}
},
"level2":{
"tag1":{
"key1":"value1",
}
},
"level3":{
"tag1":{
"key1":"value1",
"key2":"value2",
"key3":"value3",
},
"tag2":{
"key1":'value1'
}
}}"""
]]
columns = ['name', 'Levels']
df = spark.createDataFrame(rows, columns)

每个标签中的级别、标签和键值对的数量不在我的控制范围内,可能会发生变化。

我的目标是从原始的创建一个新的Dataframe,每个元组都有一个新的行。(level, tag, key, value)和相应的列。因此,从示例中的行开始,将有新的8行,格式为:
(name, level, tag, key, value)
Alice, level1, tag1, key1, value1
Alice, level1, tag1, key2, value2
Alice, level1, tag1, key3, value3

Alice, level2, tag1, key1, value1

Alice, level3, tag1, key1, value1
Alice, level3, tag1, key2, value2
Alice, level3, tag1, key3, value3
Alice, level3, tag2, key1, value1

第一步使用udf将Json转换为(level, tag, key, value)-元组数组。第二步是扩展数组以获得单独的行:

from pyspark.sql import functions as F
from pyspark.sql import types as T

df = ...
def to_array(lvl):
def to_tuple(lvl):
levels=lvl.asDict()
for l in levels:
level=l
tags = levels[l].asDict()
for t in tags:
keys = tags[t].asDict()
for k in keys:
v=keys[k]
yield (l, t, k, v)
return list(to_tuple(lvl))
outputschema=T.ArrayType(T.StructType([
T.StructField("level", T.StringType(), True),
T.StructField("tag", T.StringType(), True),
T.StructField("key", T.StringType(), True),
T.StructField("value", T.StringType(), True)
]))
to_array_udf = F.udf(to_array, outputschema)
df.withColumn("tmp", to_array_udf("Levels")) 
.withColumn("tmp", F.explode("tmp")) 
.select("Levels", "tmp.*") 
.show()

输出:

+--------------------+------+----+----+------+
|              Levels| level| tag| key| value|
+--------------------+------+----+----+------+
|{{{value1, value2...|level1|tag1|key1|value1|
|{{{value1, value2...|level1|tag1|key2|value2|
|{{{value1, value2...|level1|tag1|key3|value3|
|{{{value1, value2...|level2|tag1|key1|value1|
|{{{value1, value2...|level3|tag1|key1|value1|
|{{{value1, value2...|level3|tag1|key2|value2|
|{{{value1, value2...|level3|tag1|key3|value3|
|{{{value1, value2...|level3|tag2|key1|value1|
+--------------------+------+----+----+------+

最新更新