Pyspark解析高度嵌套的json(Prometheus)



我真的很想在使用PySpark SQL解析嵌套JSON数据方面得到一些帮助,因为我是PySpark的新手。数据具有以下模式:

架构

root
|-- data: struct (nullable = true)
|    |-- result: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- metric: struct (nullable = true)
|    |    |    |    |-- data0: string (nullable = true)
|    |    |    |    |-- data1: string (nullable = true)
|    |    |    |    |-- data2: string (nullable = true)
|    |    |    |    |-- data3: string (nullable = true)
|    |    |    |-- values: array (nullable = true)
|    |    |    |    |-- element: array (containsNull = true)
|    |    |    |    |    |-- element: string (containsNull = true)
|    |-- resultType: string (nullable = true)
|-- status: string (nullable = true)

这是JSON文件(输入(的一个示例:

{"状态":"成功",

"数据":{"resultType":"matrix","result":

{"度量":{"数据0":"T","数据1":"O"},"值":[[90,"0"],[80,"0’]]},

{"度量":{"数据0":"K","数据1":"S"};值":[[70,"0"],[60,"0"]]},

{"度量":{"数据2":"J","数据3":"O"};值":[[50,"0"],[40,"0"]]}]}}

我的目标我主要想将数据放入以下数据帧:

1-

data0 | data1 | data2 | data3 |values

示例输出数据帧:

data0  | data1 | data2  | data3 | values
"T"    |   "O" |    nan |    nan|   [90,"0"],[80, "0"]
"K"    |   "S" |    nan |    nan|   [70,"0"],[60, "0"]
nan    |   nan |    "J" |    "O"|   [50,"0"],[40, "0"]

2-

time | value | data0 | data1 | data2 | data3

示例输出数据帧

time | value |data0 | data1 | data2  | data3 
90   |   "0" |   "T"|    "O"|   nan  | nan
80   |   "0" |   "T"|    "O"|   nan  | nan
70   |   "0" |   "K"|    "S"|   nan  | nan
60   |   "0" |   "K"|    "S"|   nan  | nan
50   |   "0" |   nan|    nan|   "J"  | "O"
40   |   "0" |   nan|    nan|   "J"  | "O"

此外,如果有任何方法可以使用spark的并行功能来加快这个过程,那就太好了,因为解析的json文件是以千兆字节为单位的。

要获得第一个数据帧,可以使用:

df = (
df.withColumn("data0", F.expr("transform(data.result, x -> x.metric.data0)"))
.withColumn("data1", F.expr("transform(data.result, x -> x.metric.data1)"))
.withColumn("data2", F.expr("transform(data.result, x -> x.metric.data2)"))
.withColumn("data3", F.expr("transform(data.result, x -> x.metric.data3)"))
.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(F.lit(0), F.lit(1), F.lit(2)))
.withColumn("items", F.explode(F.col("items")))
.withColumn("data0", F.col("data0").getItem(F.col("items")))
.withColumn("data1", F.col("data1").getItem(F.col("items")))
.withColumn("data2", F.col("data2").getItem(F.col("items")))
.withColumn("data3", F.col("data3").getItem(F.col("items")))
.withColumn("values", F.col("values").getItem(F.col("items")))
.drop("data", "status", "items")
)

结果:

root
|-- data0: string (nullable = true)
|-- data1: string (nullable = true)
|-- data2: string (nullable = true)
|-- data3: string (nullable = true)
|-- values: array (nullable = true)
|    |-- element: array (containsNull = true)
|    |    |-- element: string (containsNull = true)
+-----+-----+-----+-----+------------------+
|data0|data1|data2|data3|values            |
+-----+-----+-----+-----+------------------+
|T    |O    |null |null |[[90, 0], [80, 0]]|
|K    |S    |null |null |[[70, 0], [60, 0]]|
|null |null |J    |O    |[[50, 0], [40, 0]]|
+-----+-----+-----+-----+------------------+

为了获得第二个,它是相同的,但对于值有额外的explode

df = (
df.withColumn("data0", F.expr("transform(data.result, x -> x.metric.data0)"))
.withColumn("data1", F.expr("transform(data.result, x -> x.metric.data1)"))
.withColumn("data2", F.expr("transform(data.result, x -> x.metric.data2)"))
.withColumn("data3", F.expr("transform(data.result, x -> x.metric.data3)"))
.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(F.lit(0), F.lit(1), F.lit(2)))
.withColumn("items", F.explode(F.col("items")))
.withColumn("data0", F.col("data0").getItem(F.col("items")))
.withColumn("data1", F.col("data1").getItem(F.col("items")))
.withColumn("data2", F.col("data2").getItem(F.col("items")))
.withColumn("data3", F.col("data3").getItem(F.col("items")))
.withColumn("values", F.col("values").getItem(F.col("items")))
.withColumn("values", F.explode("values"))
.withColumn("time", F.col("values").getItem(0))
.withColumn("value", F.col("values").getItem(1))
.drop("data", "status", "items", "values")
)

结果:

root
|-- data0: string (nullable = true)
|-- data1: string (nullable = true)
|-- data2: string (nullable = true)
|-- data3: string (nullable = true)
|-- time: string (nullable = true)
|-- value: string (nullable = true)
+-----+-----+-----+-----+----+-----+
|data0|data1|data2|data3|time|value|
+-----+-----+-----+-----+----+-----+
|T    |O    |null |null |90  |0    |
|T    |O    |null |null |80  |0    |
|K    |S    |null |null |70  |0    |
|K    |S    |null |null |60  |0    |
|null |null |J    |O    |50  |0    |
|null |null |J    |O    |40  |0    |
+-----+-----+-----+-----+----+-----+

  • 更新:

自动化data名称和结果数的示例:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json
data_names = []
number_of_results = 0
with open("test.json", "r") as f_in:
raw_data = json.load(f_in)
for item in raw_data["data"]["result"]:
number_of_results += 1
for key in item["metric"].keys():
if key not in data_names:
data_names.append(key)
spark = SparkSession.builder.getOrCreate()
df = spark.read.option("multiline", True).json("test.json")
for data_name in data_names:
df = df.withColumn(
data_name, F.expr(f"transform(data.result, x -> x.metric.{data_name})")
)
df = (
df.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(*[F.lit(x) for x in range(0, number_of_results)]))
.withColumn("items", F.explode(F.col("items")))
)
for data_name in data_names:
df = df.withColumn(data_name, F.col(data_name).getItem(F.col("items")))
df = df.withColumn("values", F.col("values").getItem(F.col("items"))).drop(
"data", "status", "items"
)

结果是第一个数据帧(与上面相同(

最新更新