我有一个json文件与以下模式:
root
|-- context: struct (nullable = true)
| |-- application: struct (nullable = true)
| | |-- version: string (nullable = true)
| |-- custom: struct (nullable = true)
| | |-- dimensions: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Activity ID: string (nullable = true)
| | | | |-- Activity Type: string (nullable = true)
| | | | |-- Bot ID: string (nullable = true)
| | | | |-- Channel ID: string (nullable = true)
| | | | |-- Conversation ID: string (nullable = true)
| | | | |-- Correlation ID: string (nullable = true)
| | | | |-- From ID: string (nullable = true)
| | | | |-- Recipient ID: string (nullable = true)
| | | | |-- StatusCode: string (nullable = true)
| | | | |-- Timestamp: string (nullable = true)
| |-- data: struct (nullable = true)
| | |-- eventTime: string (nullable = true)
| | |-- isSynthetic: boolean (nullable = true)
| | |-- samplingRate: double (nullable = true)
| |-- device: struct (nullable = true)
| | |-- roleInstance: string (nullable = true)
| | |-- roleName: string (nullable = true)
| | |-- type: string (nullable = true)
| |-- location: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- clientip: string (nullable = true)
| | |-- continent: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- province: string (nullable = true)
| |-- operation: struct (nullable = true)
| | |-- id: string (nullable = true)
| | |-- parentId: string (nullable = true)
| |-- session: struct (nullable = true)
| | |-- isFirst: boolean (nullable = true)
|-- event: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- count: long (nullable = true)
| | |-- name: string (nullable = true)
|-- internal: struct (nullable = true)
| |-- data: struct (nullable = true)
| | |-- documentVersion: string (nullable = true)
| | |-- id: string (nullable = true)`
在这个模式中使用pyspark,我只需要提取
活动ID、活动类型、Bot ID、通道ID、会话ID、关联ID、发件人ID、接收方ID、状态码、时间戳
在数据框架中。如何使用Pyspark实现这个目标?
JSON文件:
{
"event": [
{
"name": "Activity",
"count": 1
}
],
"internal": {
"data": {
"id": "79baca55-d168-11ea-b166-6fc861e9e21c",
"documentVersion": "1.61"
}
},
"context": {
"application": {
"version": "Wed 07/22/2020 5:37:05.58 rnUTC (fv-az461) [Build 148886] [Repo Intercom] [Branch prod] [Commit XXX] rn[XX 1.6.20-140775] [XXX 1.3.27-144047] rn"
},
"data": {
"eventTime": "2020-07-29T06:55:15.6294636Z",
"isSynthetic": false,
"samplingRate": 100
},
"cloud": {},
"device": {
"type": "PC",
"roleName": "bc-directline-southindia",
"roleInstance": "RD0003FF905CCA",
"screenResolution": {}
},
"session": {
"isFirst": false
},
"operation": {
"id": "XXX",
"parentId": "|XXXX.c4cd9570_"
},
"location": {
"clientip": "0.0.0.0",
"continent": "XX",
"country": "XXX",
"province": "XXX",
"city": "XXX"
},
"custom": {
"dimensions": [
{
"Timestamp": "XXX"
},
{
"StatusCode": "200"
},
{
"Activity ID": "JoH4veTvChCCnzchOD1Lg-f|0000001"
},
{
"From ID": "XXX"
},
{
"Correlation ID": "|54734cb21ba7f143a72ddd03fc865669.c4cd9570_"
},
{
"Channel ID": "directline"
},
{
"Recipient ID": "XXXX"
},
{
"Bot ID": "XXXX"
},
{
"Activity Type": "message"
},
{
"Conversation ID": "XXX"
}
]
}
}
}
dimensions
模式在这里是一个具有挑战性的结构。每个键值都被分离到它自己的对象中,导致创建了许多null。
我的解决方案看起来很复杂,但它将消除这些空而不会爆炸数据框架。
步骤:
- 将dimensions结构转换为MapType
- 将所有单独的对象(例如:
{ "Timestamp": "XXX" }, { "StatusCode": "200" }
)合并成一个单独的地图({"Timestamp": "XXX", "StatusCode": "200"}
) - 将MapType转换为struct并展开为列
df = spark.read.json('nested.json')
# Save dimesions's object schema for later use
dim_ele_schema = StructType.fromJson(
df.select('context.custom.dimensions').schema[0].jsonValue()['type']['elementType']
)
# Extract dimensions and convert it to MapType to aggregate
df = (df.select('context.custom.dimensions')
# Step 1
.withColumn('dim_map', F.from_json(F.to_json('dimensions'), ArrayType(MapType(StringType(), StringType()))))
# Step 2
.select(F.aggregate('dim_map',
F.create_map().cast("map<string,string>"),
lambda acc, x: F.map_concat(acc, x))
.alias('dim_map')))
# Step 3
df = (df.withColumn("dim", F.from_json(F.to_json("dim_map"), dim_ele_schema))
.select("dim.*"))
结果:
+--------------------+-------------+------+----------+---------------+--------------------+-------+------------+----------+---------+
| Activity ID|Activity Type|Bot ID|Channel ID|Conversation ID| Correlation ID|From ID|Recipient ID|StatusCode|Timestamp|
+--------------------+-------------+------+----------+---------------+--------------------+-------+------------+----------+---------+
|JoH4veTvChCCnzchO...| message| XXXX|directline| XXX||54734cb21ba7f143...| XXX| XXXX| 200| XXX|
+--------------------+-------------+------+----------+---------------+--------------------+-------+------------+----------+---------+
解释:
F.aggregate(array, initial value, merge function)
此函数将接受数组和聚合(类似于Python中的reduce
)。我在这里尝试的是将单个字典(Map)合并为1 Map,初始值为空Map。F.create_map().cast("map<string,string>")
将为键和值生成字符串类型的空Map。然后对于合并功能,我使用map_concat
连接2个地图(主要的一个是一遍又一遍地连接和每个单独的地图)。
裁判:https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.map_concat.html
可以在df上执行嵌套选择,然后使用" *">
将名称结构类型转换为列。# prepare data
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.read.option("multiline", "true").json("./ressources/75061097.json")
# Processing
result = df.withColumn("id", monotonically_increasing_id())
.select("id", explode(col("context.custom.dimensions"))).select("id", "col.*")
.groupby("id").agg(first(col('Activity ID'), ignorenulls=True).alias("Activity ID"),
first(col("Activity Type"), ignorenulls=True).alias("Activity Type"),
first(col("Bot ID"), ignorenulls=True).alias("Bot ID"),
first(col("Channel ID"), ignorenulls=True).alias("Channel ID"),
first(col("Conversation ID"), ignorenulls=True).alias("Conversation ID"),
first(col("Correlation ID"), ignorenulls=True).alias("Correlation ID"),
first(col("From ID"), ignorenulls=True).alias("From ID"),
first(col("Recipient ID"), ignorenulls=True).alias("Recipient ID"),
first(col("StatusCode"), ignorenulls=True).alias("StatusCode"),
first(col("Timestamp"), ignorenulls=True).alias("Timestamp"),
).drop("id")
result.show(truncate=False)
+-------------------------------+-------------+------+----------+---------------+-------------------------------------------+-------+------------+----------+---------+
|Activity ID |Activity Type|Bot ID|Channel ID|Conversation ID|Correlation ID |From ID|Recipient ID|StatusCode|Timestamp|
+-------------------------------+-------------+------+----------+---------------+-------------------------------------------+-------+------------+----------+---------+
|JoH4veTvChCCnzchOD1Lg-f|0000001|message |XXXX |directline|XXX ||54734cb21ba7f143a72ddd03fc865669.c4cd9570_|XXX |XXXX |200 |XXX |
+-------------------------------+-------------+------+----------+---------------+-------------------------------------------+-------+------------+----------+---------+