我有一个spark DataFrame,看起来像:
[Row(id = '1', dictField={"keyA":"valueA","keyB":"valueB"}),
Row(id = '2', dictField={"keyC":"valueC","keyD":"valueD","keyA":"valueA"}),
Row(id = '3', dictField={"keyZ":"valueZ","keyA":"valueA"})]
我正试图把它分解成以下格式。
+---------+-----------+---------+
| id| key | value |
+-------------------------------+
| 1| keyA| valueA |
| 1| keyB| valueB |
| 2| keyC| valueC |
| 2| keyD| valueC |
| 2| keyA| valueA |
| 3| keyZ| valueZ |
| 3| keyA| valueA |
请注意-密钥不是预定义的/已知的。
如果您的列是正确的json格式,那么您可以使用json解析器(get_json_object(以解决问题。
我使用了udf并在spark-scala中创建了代码。你可以参考以下内容解决问题的代码。我使用了给定的架构id:string(nullable=true(|--dictField:string(nullable=true(
val df = Seq(("1","{"keyA":"valueA","keyB":"valueB"}"),("2","{"keyC":"valueC","keyD":"valueD","keyA":"valueA"}"),("3","{"keyZ":"valueZ","keyA":"valueA"}")).toDF("id","dictField")
+---+-------------------------------------------------+
|id |dictField |
+---+-------------------------------------------------+
|1 |{"keyA":"valueA","keyB":"valueB"} |
|2 |{"keyC":"valueC","keyD":"valueD","keyA":"valueA"}|
|3 |{"keyZ":"valueZ","keyA":"valueA"} |
+---+-------------------------------------------------+
//create udf to parse the string into Array(String, String)
def parse_str(value:String) = {
val values = value.replace("{","").replace("}","").replace(""","").split(",").map(_.trim)
values.foldLeft(Array[(String,String)]()){
case (acc, present) =>
val Array(k, v) = present.split(",")(0).split(":")
acc :+ (k,v)
}
}
//register udf
val parsed_udf = udf(parse_str _)
//use above udf and explode the array
val result = df.withColumn("parse",explode(parsed_udf($"dictField")))
//select required value
result.select($"id", $"parse._1".as("key"), $"parse._2".as("value")).show()
+---+----+------+
| id| key| value|
+---+----+------+
| 1|keyA|valueA|
| 1|keyB|valueB|
| 2|keyC|valueC|
| 2|keyD|valueD|
| 2|keyA|valueA|
| 3|keyZ|valueZ|
| 3|keyA|valueA|
+---+----+------+
这可能是一个有效的解决方案,使用MapType,然后使用Spark API高阶函数中提供的Explode函数
在此处创建数据帧
df_new = spark.createDataFrame([(str({"val1":"3", "val11":"31"})), (str({"val2":"4", "val22":"44"}))],T.StringType())
#OutPut
+----------------------------+
|value |
+----------------------------+
|{'val1': '3', 'val11': '31'}|
|{'val2': '4', 'val22': '44'}|
+----------------------------+
此处的逻辑
df_new = df_new.withColumn('col', F.from_json("value",T.MapType(T.StringType(), T.StringType())))
df_new = df_new.select("col", F.explode("col").alias("x", "y"))
df_new.show(truncate=False)
输出
+------------------------+-----+---+
|col |x |y |
+------------------------+-----+---+
|[val1 -> 3, val11 -> 31]|val1 |3 |
|[val1 -> 3, val11 -> 31]|val11|31 |
|[val2 -> 4, val22 -> 44]|val2 |4 |
|[val2 -> 4, val22 -> 44]|val22|44 |
+------------------------+-----+---+