通过使用PySpark如何解析嵌套json



我有一个json文件与以下模式:

root
|-- context: struct (nullable = true)
|    |-- application: struct (nullable = true)
|    |    |-- version: string (nullable = true)
|    |-- custom: struct (nullable = true)
|    |    |-- dimensions: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- Activity ID: string (nullable = true)
|    |    |    |    |-- Activity Type: string (nullable = true)
|    |    |    |    |-- Bot ID: string (nullable = true)
|    |    |    |    |-- Channel ID: string (nullable = true)
|    |    |    |    |-- Conversation ID: string (nullable = true)
|    |    |    |    |-- Correlation ID: string (nullable = true)
|    |    |    |    |-- From ID: string (nullable = true)
|    |    |    |    |-- Recipient ID: string (nullable = true)
|    |    |    |    |-- StatusCode: string (nullable = true)
|    |    |    |    |-- Timestamp: string (nullable = true)
|    |-- data: struct (nullable = true)
|    |    |-- eventTime: string (nullable = true)
|    |    |-- isSynthetic: boolean (nullable = true)
|    |    |-- samplingRate: double (nullable = true)
|    |-- device: struct (nullable = true)
|    |    |-- roleInstance: string (nullable = true)
|    |    |-- roleName: string (nullable = true)
|    |    |-- type: string (nullable = true)
|    |-- location: struct (nullable = true)
|    |    |-- city: string (nullable = true)
|    |    |-- clientip: string (nullable = true)
|    |    |-- continent: string (nullable = true)
|    |    |-- country: string (nullable = true)
|    |    |-- province: string (nullable = true)
|    |-- operation: struct (nullable = true)
|    |    |-- id: string (nullable = true)
|    |    |-- parentId: string (nullable = true)
|    |-- session: struct (nullable = true)
|    |    |-- isFirst: boolean (nullable = true)
|-- event: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- count: long (nullable = true)
|    |    |-- name: string (nullable = true)
|-- internal: struct (nullable = true)
|    |-- data: struct (nullable = true)
|    |    |-- documentVersion: string (nullable = true)
|    |    |-- id: string (nullable = true)`

在这个模式中使用pyspark,我只需要提取

活动ID、活动类型、Bot ID、通道ID、会话ID、关联ID、发件人ID、接收方ID、状态码、时间戳

在数据框架中。如何使用Pyspark实现这个目标?

JSON文件:

{
"event": [
{
"name": "Activity",
"count": 1
}
],
"internal": {
"data": {
"id": "79baca55-d168-11ea-b166-6fc861e9e21c",
"documentVersion": "1.61"
}
},
"context": {
"application": {
"version": "Wed 07/22/2020  5:37:05.58 rnUTC (fv-az461) [Build 148886] [Repo Intercom] [Branch prod] [Commit XXX] rn[XX 1.6.20-140775]  [XXX 1.3.27-144047] rn"
},
"data": {
"eventTime": "2020-07-29T06:55:15.6294636Z",
"isSynthetic": false,
"samplingRate": 100
},
"cloud": {},
"device": {
"type": "PC",
"roleName": "bc-directline-southindia",
"roleInstance": "RD0003FF905CCA",
"screenResolution": {}
},
"session": {
"isFirst": false
},
"operation": {
"id": "XXX",
"parentId": "|XXXX.c4cd9570_"
},
"location": {
"clientip": "0.0.0.0",
"continent": "XX",
"country": "XXX",
"province": "XXX",
"city": "XXX"
},
"custom": {
"dimensions": [
{
"Timestamp": "XXX"
},
{
"StatusCode": "200"
},
{
"Activity ID": "JoH4veTvChCCnzchOD1Lg-f|0000001"
},
{
"From ID": "XXX"
},
{
"Correlation ID": "|54734cb21ba7f143a72ddd03fc865669.c4cd9570_"
},
{
"Channel ID": "directline"
},
{
"Recipient ID": "XXXX"
},
{
"Bot ID": "XXXX"
},
{
"Activity Type": "message"
},
{
"Conversation ID": "XXX"
}
]
}
}
}

dimensions模式在这里是一个具有挑战性的结构。每个键值都被分离到它自己的对象中,导致创建了许多null。

我的解决方案看起来很复杂,但它将消除这些空而不会爆炸数据框架。

步骤:

  1. 将dimensions结构转换为MapType
  2. 将所有单独的对象(例如:{ "Timestamp": "XXX" }, { "StatusCode": "200" })合并成一个单独的地图({"Timestamp": "XXX", "StatusCode": "200"})
  3. 将MapType转换为struct并展开为列
df = spark.read.json('nested.json')
# Save dimesions's object schema for later use
dim_ele_schema = StructType.fromJson(
df.select('context.custom.dimensions').schema[0].jsonValue()['type']['elementType']
)
# Extract dimensions and convert it to MapType to aggregate
df = (df.select('context.custom.dimensions')
# Step 1
.withColumn('dim_map', F.from_json(F.to_json('dimensions'), ArrayType(MapType(StringType(), StringType()))))
# Step 2
.select(F.aggregate('dim_map', 
F.create_map().cast("map<string,string>"),
lambda acc, x: F.map_concat(acc, x))
.alias('dim_map')))
# Step 3
df = (df.withColumn("dim", F.from_json(F.to_json("dim_map"), dim_ele_schema))
.select("dim.*"))

结果:

+--------------------+-------------+------+----------+---------------+--------------------+-------+------------+----------+---------+
|         Activity ID|Activity Type|Bot ID|Channel ID|Conversation ID|      Correlation ID|From ID|Recipient ID|StatusCode|Timestamp|
+--------------------+-------------+------+----------+---------------+--------------------+-------+------------+----------+---------+
|JoH4veTvChCCnzchO...|      message|  XXXX|directline|            XXX||54734cb21ba7f143...|    XXX|        XXXX|       200|      XXX|
+--------------------+-------------+------+----------+---------------+--------------------+-------+------------+----------+---------+

解释:

F.aggregate(array, initial value, merge function)

此函数将接受数组和聚合(类似于Python中的reduce)。我在这里尝试的是将单个字典(Map)合并为1 Map,初始值为空Map。F.create_map().cast("map<string,string>")将为键和值生成字符串类型的空Map。然后对于合并功能,我使用map_concat连接2个地图(主要的一个是一遍又一遍地连接和每个单独的地图)。

裁判:https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.map_concat.html

可以在df上执行嵌套选择,然后使用" *">

将名称结构类型转换为列。
# prepare data
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.read.option("multiline", "true").json("./ressources/75061097.json")
# Processing
result = df.withColumn("id", monotonically_increasing_id()) 
.select("id", explode(col("context.custom.dimensions"))).select("id", "col.*") 
.groupby("id").agg(first(col('Activity ID'), ignorenulls=True).alias("Activity ID"),
first(col("Activity Type"), ignorenulls=True).alias("Activity Type"),
first(col("Bot ID"), ignorenulls=True).alias("Bot ID"),
first(col("Channel ID"), ignorenulls=True).alias("Channel ID"),
first(col("Conversation ID"), ignorenulls=True).alias("Conversation ID"),
first(col("Correlation ID"), ignorenulls=True).alias("Correlation ID"),
first(col("From ID"), ignorenulls=True).alias("From ID"),
first(col("Recipient ID"), ignorenulls=True).alias("Recipient ID"),
first(col("StatusCode"), ignorenulls=True).alias("StatusCode"),
first(col("Timestamp"), ignorenulls=True).alias("Timestamp"),
).drop("id")
result.show(truncate=False)

+-------------------------------+-------------+------+----------+---------------+-------------------------------------------+-------+------------+----------+---------+
|Activity ID                    |Activity Type|Bot ID|Channel ID|Conversation ID|Correlation ID                             |From ID|Recipient ID|StatusCode|Timestamp|
+-------------------------------+-------------+------+----------+---------------+-------------------------------------------+-------+------------+----------+---------+
|JoH4veTvChCCnzchOD1Lg-f|0000001|message      |XXXX  |directline|XXX            ||54734cb21ba7f143a72ddd03fc865669.c4cd9570_|XXX    |XXXX        |200       |XXX      |
+-------------------------------+-------------+------+----------+---------------+-------------------------------------------+-------+------------+----------+---------+

相关内容

  • 没有找到相关文章

最新更新