Pyspark:通过多行嵌套json迭代来构建数据帧



Guys我需要一些帮助来迭代pyspark中的以下json。。。以及构建一个数据帧:

{
"success": true,
"result": {
"0x00e01a648ff41346cdeb873182383333d2184dd1": {
"id": 130,
"name": "xn--mytherwallet-fvb.com",
"url": "http://xn--mytherwallet-fvb.com",
"coin": "ETH",
"category": "Phishing",
"subcategory": "MyEtherWallet",
"description": "Homoglyph",
"addresses": [
"0x00e01a648ff41346cdeb873182383333d2184dd1",
"0x11e01a648ff41346cdeb873182383333d2184dd1"
],
"reporter": "MyCrypto",
"status": "Offline"
},
"0x858457daa7e087ad74cdeeceab8419079bc2ca03": {
"id": 1200,
"name": "myetherwallet.in",
"url": "http://myetherwallet.in",
"coin": "ETH",
"category": "Phishing",
"subcategory": "MyEtherWallet",
"addresses": ["0x858457daa7e087ad74cdeeceab8419079bc2ca03"],
"reporter": "MyCrypto",
"ip": "159.8.210.35",
"nameservers": [
"ns2.eftydns.com",
"ns1.eftydns.com"
],
"status": "Active"
}
}
}

我需要构建一个表示地址列表的数据帧。

我将JSON格式化为SPARK可读格式。

{"success": true, "result": {"0x00e01a648ff41346cdeb873182383333d2184dd1": {"id": 130, "name": "xn--mytherwallet-fvb.com", "url": "http://xn--mytherwallet-fvb.com", "coin": "ETH", "category": "Phishing", "subcategory": "MyEtherWallet", "description": "Homoglyph", "addresses": ["0x00e01a648ff41346cdeb873182383333d2184dd1", "0x11e01a648ff41346cdeb873182383333d2184dd1"], "reporter": "MyCrypto", "status": "Offline"}, "0x858457daa7e087ad74cdeeceab8419079bc2ca03": {"id": 1200, "name": "myetherwallet.in", "url": "http://myetherwallet.in", "coin": "ETH", "category": "Phishing", "subcategory": "MyEtherWallet", "addresses": ["0x858457daa7e087ad74cdeeceab8419079bc2ca03"], "reporter": "MyCrypto", "ip": "159.8.210.35", "nameservers": ["ns2.eftydns.com", "ns1.eftydns.com"], "status": "Active"}}}

读取JSON

val df = spark.read.json("/my_data.json")
df.printSchema()
df.show(false)

输出

root
|-- result: struct (nullable = true)
|    |-- 0x00e01a648ff41346cdeb873182383333d2184dd1: struct (nullable = true)
|    |    |-- addresses: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|    |    |-- category: string (nullable = true)
|    |    |-- coin: string (nullable = true)
|    |    |-- description: string (nullable = true)
|    |    |-- id: long (nullable = true)
|    |    |-- name: string (nullable = true)
|    |    |-- reporter: string (nullable = true)
|    |    |-- status: string (nullable = true)
|    |    |-- subcategory: string (nullable = true)
|    |    |-- url: string (nullable = true)
|    |-- 0x858457daa7e087ad74cdeeceab8419079bc2ca03: struct (nullable = true)
|    |    |-- addresses: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|    |    |-- category: string (nullable = true)
|    |    |-- coin: string (nullable = true)
|    |    |-- id: long (nullable = true)
|    |    |-- ip: string (nullable = true)
|    |    |-- name: string (nullable = true)
|    |    |-- nameservers: array (nullable = true)
|    |    |    |-- element: string (containsNull = true)
|    |    |-- reporter: string (nullable = true)
|    |    |-- status: string (nullable = true)
|    |    |-- subcategory: string (nullable = true)
|    |    |-- url: string (nullable = true)
|-- success: boolean (nullable = true)
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|result                                                                                                                                                                                                                                                                                                                                                                                                                                     |success|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|[[WrappedArray(0x00e01a648ff41346cdeb873182383333d2184dd1, 0x11e01a648ff41346cdeb873182383333d2184dd1),Phishing,ETH,Homoglyph,130,xn--mytherwallet-fvb.com,MyCrypto,Offline,MyEtherWallet,http://xn--mytherwallet-fvb.com],[WrappedArray(0x858457daa7e087ad74cdeeceab8419079bc2ca03),Phishing,ETH,1200,159.8.210.35,myetherwallet.in,WrappedArray(ns2.eftydns.com, ns1.eftydns.com),MyCrypto,Active,MyEtherWallet,http://myetherwallet.in]]|true   |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+

在Pyspark中,您可以执行以下操作。不需要重新格式化json——它的格式非常完美,只需要将.option('multiline', True)传递给json读取器。

df = spark.read.option('multiline', True).json('test.json')

获取地址:

import pyspark.sql.functions as F
df2 = df.select('result.*')
df3 = df2.select(
F.explode(
F.concat(
*[F.col(f'{col}.addresses') for col in df2.columns]
)
).alias('addresses')
)
df3.show(truncate=False)
+------------------------------------------+
|addresses                                 |
+------------------------------------------+
|0x00e01a648ff41346cdeb873182383333d2184dd1|
|0x11e01a648ff41346cdeb873182383333d2184dd1|
|0x858457daa7e087ad74cdeeceab8419079bc2ca03|
+------------------------------------------+

最新更新