我有一个Dataframe,它有一列包含一些嵌套的JSON和可变模式。即,每行中的JSON都有不同的模式。
例如
Key Value
1 {"foo":"bar"}
2 {"key1":"val1","key2":"val2"}
我需要解析它,并创建一个最终的数据帧,其中包含根据JSON模式组合的所有列,这些列形成所有行及其各自的值,如下所示。
Key foo key1 key2
1 bar null null
2 null val1 val2
val data = Seq((1, """{"foo":"bar"}"""),
(2, """{"key1":"val1","key2":"val2"}"""),
(3, """{"key1":"val1","key3":"val3", "key4": "val4"}"""))
val df = spark.createDataFrame(
data
).toDF("num", "keyvalue")
df.show()
输出:
+---+---------------------------------------------+
|num|keyvalue |
+---+---------------------------------------------+
|1 |{"foo":"bar"} |
|2 |{"key1":"val1","key2":"val2"} |
|3 |{"key1":"val1","key3":"val3", "key4": "val4"}|
+---+---------------------------------------------+
将keyvalue json对象中的值转换为scala映射对象。让我们称之为mapped_df
import scala.util.parsing.json._
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
val stringToMap = udf((str: String) => JSON.parseFull(str).get.asInstanceOf[Map[String, String]])
val mapped_df =df.withColumn("mapped", stringToMap(col("keyvalue")))
mapped_df.show(false)
输出(mapped_df(:-
+---+---------------------------------------------+------------------------------------------+
|num|keyvalue |mapped |
+---+---------------------------------------------+------------------------------------------+
|1 |{"foo":"bar"} |[foo -> bar] |
|2 |{"key1":"val1","key2":"val2"} |[key1 -> val1, key2 -> val2] |
|3 |{"key1":"val1","key3":"val3", "key4": "val4"}|[key1 -> val1, key3 -> val3, key4 -> val4]|
+---+---------------------------------------------+------------------------------------------+
通过从上方的映射列中收集所有唯一密钥来创建新的数据帧架构
var schema = List(StructField("number", IntegerType))
val col_rdd = mapped_df.select(col("mapped")).rdd.map(x => {
val maps: Map[String, String] = x.getAs[Map[String, String]]("mapped")
val m = maps.map(x => x._1)
m
})
val schem = col_rdd.flatMap(x => x).collect().sorted.toSet
val new_schema = schem.toList.map(x => StructField(x, StringType, true))
schema = schema ++ new_schema
输出-
schema: List[org.apache.spark.sql.types.StructField] = List(
StructField(number,IntegerType,true),
StructField(key4,StringType,true),
StructField(key1,StringType,true),
StructField(key2,StringType,true),
StructField(key3,StringType,true),
StructField(foo,StringType,true))
现在我们创建了Schema。将mapped_df转换为rdd,并对其进行以下操作,使其与我们的新模式一致:
val df_rdd = mapped_df.rdd.map(row => {
val num = List(row.getAs[Int]("num"))
val map_val: Map[String, String] = row.getAs[Map[String, String]]("mapped")
val new_cols = schem.toList.map(x => map_val.getOrElse(x, null))
Row.fromSeq(num ++ new_cols)
})
val new_dataframe = spark.createDataFrame(df_rdd, StructType(schema))
new_dataframe.show(false)
新数据帧与给定数据帧
New Dataframe
+------+----+----+----+----+----+
|number|key4|key1|key2|key3| foo|
+------+----+----+----+----+----+
| 1|null|null|null|null| bar|
| 2|null|val1|val2|null|null|
| 3|val4|val1|null|val3|null|
+------+----+----+----+----+----+
Given Dataframe:
+---+---------------------------------------------+
|num|keyvalue |
+---+---------------------------------------------+
|1 |{"foo":"bar"} |
|2 |{"key1":"val1","key2":"val2"} |
|3 |{"key1":"val1","key3":"val3", "key4": "val4"}|
+---+---------------------------------------------+
谢谢!