从带有变量Schema的dataFrame列中读取JSON



我有一个Dataframe,它有一列包含一些嵌套的JSON和可变模式。即,每行中的JSON都有不同的模式。

例如

Key     Value
1       {"foo":"bar"}
2       {"key1":"val1","key2":"val2"}

我需要解析它,并创建一个最终的数据帧,其中包含根据JSON模式组合的所有列,这些列形成所有行及其各自的值,如下所示。

Key     foo     key1        key2
1       bar     null        null
2       null    val1        val2
val data = Seq((1, """{"foo":"bar"}"""), 
(2, """{"key1":"val1","key2":"val2"}"""),
(3, """{"key1":"val1","key3":"val3", "key4": "val4"}"""))

val df = spark.createDataFrame(
data
).toDF("num", "keyvalue")
df.show()

输出:

+---+---------------------------------------------+
|num|keyvalue                                     |
+---+---------------------------------------------+
|1  |{"foo":"bar"}                                |
|2  |{"key1":"val1","key2":"val2"}                |
|3  |{"key1":"val1","key3":"val3", "key4": "val4"}|
+---+---------------------------------------------+

将keyvalue json对象中的值转换为scala映射对象。让我们称之为mapped_df

import scala.util.parsing.json._
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
val stringToMap = udf((str: String) => JSON.parseFull(str).get.asInstanceOf[Map[String, String]])
val mapped_df =df.withColumn("mapped", stringToMap(col("keyvalue")))
mapped_df.show(false)

输出(mapped_df(:-

+---+---------------------------------------------+------------------------------------------+
|num|keyvalue                                     |mapped                                    |
+---+---------------------------------------------+------------------------------------------+
|1  |{"foo":"bar"}                                |[foo -> bar]                              |
|2  |{"key1":"val1","key2":"val2"}                |[key1 -> val1, key2 -> val2]              |
|3  |{"key1":"val1","key3":"val3", "key4": "val4"}|[key1 -> val1, key3 -> val3, key4 -> val4]|
+---+---------------------------------------------+------------------------------------------+

通过从上方的映射列中收集所有唯一密钥来创建新的数据帧架构

var schema = List(StructField("number", IntegerType))
val col_rdd = mapped_df.select(col("mapped")).rdd.map(x => {
val maps: Map[String, String] = x.getAs[Map[String, String]]("mapped")
val m = maps.map(x =>  x._1)
m
})
val schem = col_rdd.flatMap(x => x).collect().sorted.toSet
val new_schema = schem.toList.map(x => StructField(x, StringType, true))
schema = schema ++ new_schema

输出-

schema: List[org.apache.spark.sql.types.StructField] = List(
StructField(number,IntegerType,true), 
StructField(key4,StringType,true), 
StructField(key1,StringType,true), 
StructField(key2,StringType,true), 
StructField(key3,StringType,true), 
StructField(foo,StringType,true))

现在我们创建了Schema。将mapped_df转换为rdd,并对其进行以下操作,使其与我们的新模式一致:

val df_rdd = mapped_df.rdd.map(row => {
val num = List(row.getAs[Int]("num"))
val map_val: Map[String, String] = row.getAs[Map[String, String]]("mapped")
val new_cols = schem.toList.map(x => map_val.getOrElse(x, null))
Row.fromSeq(num ++ new_cols)
})
val new_dataframe = spark.createDataFrame(df_rdd, StructType(schema))

new_dataframe.show(false)

新数据帧与给定数据帧

New Dataframe
+------+----+----+----+----+----+
|number|key4|key1|key2|key3| foo|
+------+----+----+----+----+----+
|     1|null|null|null|null| bar|
|     2|null|val1|val2|null|null|
|     3|val4|val1|null|val3|null|
+------+----+----+----+----+----+
Given Dataframe:
+---+---------------------------------------------+
|num|keyvalue                                     |
+---+---------------------------------------------+
|1  |{"foo":"bar"}                                |
|2  |{"key1":"val1","key2":"val2"}                |
|3  |{"key1":"val1","key3":"val3", "key4": "val4"}|
+---+---------------------------------------------+

谢谢!

最新更新