使用spark 2.3解析json数据



我有以下json数据:

{
"3200": {
"id": "3200",
"value": [
"cat",
"dog"
]
},
"2000": {
"id": "2000",
"value": [
"bird"
]
},
"2500": {
"id": "2500",
"value": [
"kitty"
]
},
"3650": {
"id": "3650",
"value": [
"horse"
]
}
}

该数据的模式,在我们用spark加载数据后使用printSchema实用程序如下:

root
|-- 3200: struct (nullable = true)
|    |-- id: string (nullable = true)
|    |-- value: array (nullable = true)
|    |    |-- element: string (containsNull = true)
|-- 2000: struct (nullable = true)
|    |-- id: string (nullable = true)
|    |-- value: array (nullable = true)
|    |    |-- element: string (containsNull = true)
|-- 2500: struct (nullable = true)
|     |-- id: string (nullable = true)
|    |-- value: array (nullable = true)
|    |    |-- element: string (containsNull = true)
|-- 3650: struct (nullable = true)
|   |-- id: string (nullable = true)
|    |-- value: array (nullable = true)
|    |    |-- element: string (containsNull = true)

,我想得到下面的数据帧

id    value
3200  cat
2000  bird
2500  kitty
3200  dog
3650  horse 

如何进行解析以获得预期的输出

使用spark-sql

数据帧步骤(与Mohana的回答相同)

val df = spark.read.json(Seq(jsonData).toDS())

创建临时视图

df.createOrReplaceTempView("df")

结果:

val cols_k = df.columns.map( x => s"`${x}`.id" ).mkString(",")
val cols_v = df.columns.map( x => s"`${x}`.value" ).mkString(",")
spark.sql(s""" 
with t1 ( select map_from_arrays(array(${cols_k}),array(${cols_v})) s from df ),
t2 ( select explode(s) (key,value) from t1 )
select key, explode(value) value from t2
""").show(false)
+----+-----+
|key |value|
+----+-----+
|2000|bird |
|2500|kitty|
|3200|cat  |
|3200|dog  |
|3650|horse|
+----+-----+

您可以使用stack()函数对数据帧进行转置,然后使用explode_outer函数提取key字段,并将value字段爆炸。

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val jsonData = """{
|  "3200": {
|    "id": "3200",
|    "value": [
|      "cat",
|      "dog"
|    ]
|  },
|  "2000": {
|    "id": "2000",
|    "value": [
|      "bird"
|    ]
|  },
|  "2500": {
|    "id": "2500",
|    "value": [
|      "kitty"
|    ]
|  },
|  "3650": {
|     "id": "3650",
|      "value": [
|      "horse"
|    ]
|  }
|}
|""".stripMargin
val df = spark.read.json(Seq(jsonData).toDS())
df.selectExpr("stack (4, *) key")
.select(expr("key.id").as("key"),
explode_outer(expr("key.value")).as("value"))
.show(false)
+----+-----+
|key |value|
+----+-----+
|2000|bird |
|2500|kitty|
|3200|cat  |
|3200|dog  |
|3650|horse|
+----+-----+

相关内容

  • 没有找到相关文章

最新更新