使用pyspark解析dict-key:value(键不是预先定义的)



我有一个spark DataFrame,看起来像:

[Row(id = '1', dictField={"keyA":"valueA","keyB":"valueB"}), 
Row(id = '2', dictField={"keyC":"valueC","keyD":"valueD","keyA":"valueA"}),   
Row(id = '3', dictField={"keyZ":"valueZ","keyA":"valueA"})]

我正试图把它分解成以下格式。

+---------+-----------+---------+
|       id|      key  |  value  |
+-------------------------------+
|        1|       keyA|  valueA |
|        1|       keyB|  valueB |
|        2|       keyC|  valueC |
|        2|       keyD|  valueC |
|        2|       keyA|  valueA |
|        3|       keyZ|  valueZ |
|        3|       keyA|  valueA |

请注意-密钥不是预定义的/已知的。

如果您的列是正确的json格式,那么您可以使用json解析器(get_json_object(以解决问题。

我使用了udf并在spark-scala中创建了代码。你可以参考以下内容解决问题的代码。我使用了给定的架构id:string(nullable=true(|--dictField:string(nullable=true(

val df = Seq(("1","{"keyA":"valueA","keyB":"valueB"}"),("2","{"keyC":"valueC","keyD":"valueD","keyA":"valueA"}"),("3","{"keyZ":"valueZ","keyA":"valueA"}")).toDF("id","dictField")
+---+-------------------------------------------------+
|id |dictField                                        |
+---+-------------------------------------------------+
|1  |{"keyA":"valueA","keyB":"valueB"}                |
|2  |{"keyC":"valueC","keyD":"valueD","keyA":"valueA"}|
|3  |{"keyZ":"valueZ","keyA":"valueA"}                |
+---+-------------------------------------------------+
//create udf to parse the string into Array(String, String)
def parse_str(value:String) = {
val values =  value.replace("{","").replace("}","").replace(""","").split(",").map(_.trim)
values.foldLeft(Array[(String,String)]()){
case (acc, present) => 
val Array(k, v) = present.split(",")(0).split(":")
acc :+ (k,v)
}
}
//register udf
val parsed_udf = udf(parse_str _)
//use above udf and explode the array
val result = df.withColumn("parse",explode(parsed_udf($"dictField")))
//select required value
result.select($"id", $"parse._1".as("key"), $"parse._2".as("value")).show()
+---+----+------+
| id| key| value|
+---+----+------+
|  1|keyA|valueA|
|  1|keyB|valueB|
|  2|keyC|valueC|
|  2|keyD|valueD|
|  2|keyA|valueA|
|  3|keyZ|valueZ|
|  3|keyA|valueA|
+---+----+------+

这可能是一个有效的解决方案,使用MapType,然后使用Spark API高阶函数中提供的Explode函数

在此处创建数据帧

df_new = spark.createDataFrame([(str({"val1":"3", "val11":"31"})), (str({"val2":"4", "val22":"44"}))],T.StringType())
#OutPut
+----------------------------+
|value                       |
+----------------------------+
|{'val1': '3', 'val11': '31'}|
|{'val2': '4', 'val22': '44'}|
+----------------------------+

此处的逻辑

df_new = df_new.withColumn('col', F.from_json("value",T.MapType(T.StringType(), T.StringType())))
df_new = df_new.select("col", F.explode("col").alias("x", "y"))
df_new.show(truncate=False)

输出

+------------------------+-----+---+
|col                     |x    |y  |
+------------------------+-----+---+
|[val1 -> 3, val11 -> 31]|val1 |3  |
|[val1 -> 3, val11 -> 31]|val11|31 |
|[val2 -> 4, val22 -> 44]|val2 |4  |
|[val2 -> 4, val22 -> 44]|val22|44 |
+------------------------+-----+---+

最新更新