我有以下json数据:
{
"3200": {
"id": "3200",
"value": [
"cat",
"dog"
]
},
"2000": {
"id": "2000",
"value": [
"bird"
]
},
"2500": {
"id": "2500",
"value": [
"kitty"
]
},
"3650": {
"id": "3650",
"value": [
"horse"
]
}
}
该数据的模式,在我们用spark加载数据后使用printSchema实用程序如下:
root
|-- 3200: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- value: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- 2000: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- value: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- 2500: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- value: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- 3650: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- value: array (nullable = true)
| | |-- element: string (containsNull = true)
,我想得到下面的数据帧
id value
3200 cat
2000 bird
2500 kitty
3200 dog
3650 horse
如何进行解析以获得预期的输出
使用spark-sql
数据帧步骤(与Mohana的回答相同)
val df = spark.read.json(Seq(jsonData).toDS())
创建临时视图
df.createOrReplaceTempView("df")
结果:
val cols_k = df.columns.map( x => s"`${x}`.id" ).mkString(",")
val cols_v = df.columns.map( x => s"`${x}`.value" ).mkString(",")
spark.sql(s"""
with t1 ( select map_from_arrays(array(${cols_k}),array(${cols_v})) s from df ),
t2 ( select explode(s) (key,value) from t1 )
select key, explode(value) value from t2
""").show(false)
+----+-----+
|key |value|
+----+-----+
|2000|bird |
|2500|kitty|
|3200|cat |
|3200|dog |
|3650|horse|
+----+-----+
您可以使用stack()
函数对数据帧进行转置,然后使用explode_outer
函数提取key
字段,并将value
字段爆炸。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val jsonData = """{
| "3200": {
| "id": "3200",
| "value": [
| "cat",
| "dog"
| ]
| },
| "2000": {
| "id": "2000",
| "value": [
| "bird"
| ]
| },
| "2500": {
| "id": "2500",
| "value": [
| "kitty"
| ]
| },
| "3650": {
| "id": "3650",
| "value": [
| "horse"
| ]
| }
|}
|""".stripMargin
val df = spark.read.json(Seq(jsonData).toDS())
df.selectExpr("stack (4, *) key")
.select(expr("key.id").as("key"),
explode_outer(expr("key.value")).as("value"))
.show(false)
+----+-----+
|key |value|
+----+-----+
|2000|bird |
|2500|kitty|
|3200|cat |
|3200|dog |
|3650|horse|
+----+-----+