Pyspark 分解字典列表并根据字典键对它们进行分组



>我有一个由字典列表组成的数据帧,想要拆分每个字典并根据其中一个键值创建一行。

示例数据:

[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"},{"col.1":"13ABC","col.2":"141","col.3":"","col.4":"ABCD"}]

输入数据帧:

+---------------------------------------------------------------------------------------------------------------------------+
|ID|DATASET                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------+
|4A|[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"},{"col.1":"13ABC","col.2":"141","col.3":"","col.4":"ABCD"}]   |
|4B|[]                                                                                                                      |
+---------------------------------------------------------------------------------------------------------------------------+

预期成果:

+-----------------------------------------------------------------+
|ID|col_1 | col_2 | col                                           |
+-----------------------------------------------------------------+
|4A|"12ABC"|"{"col.2":"141","col.3":"","col.4":"ABCD"}"           |
|4A|"13ABC"|"{"col.2":"141","col.3":"","col.4":"ABCD"}"           |
|4B|""|""                                                         |
+-----------------------------------------------------------------+

尝试为数据集 col 创建架构并分离数据,但不确定根据 col.1 值对它们进行分组和合并

schema= spark.read.json(df.rdd.map(lambda row: row.dataset((.schema

并且还引用了将 Pyspark 数据帧转换为字典

提前致谢

编辑

df2.withColumn("col", f.to_json(f.struct("`col.1`","`col.2`"))).show(truncate=False)

结果:

+---+-----+-----+-----+-----+-------------------------------+
|ID |col.1|col.2|col.3|col.4|col                            |
+---+-----+-----+-----+-----+-------------------------------+
|4A |12ABC|141  |     |ABCD |{"col.1":"12ABC","col.2":"141"}|
|4A |13ABC|141  |     |ABCD |{"col.1":"13ABC","col.2":"141"}|
+---+-----+-----+-----+-----+-------------------------------+

我尝试了以下方法,它有效。不确定它是否是优化的方法,开放输入和改进。

输入自由度

>>> df.show(truncate=False)
+--------------------------------------------------------------------------------------------------------------------+---+-----+-----+
|DATASET                                                                                                             |ID |count|level|
+--------------------------------------------------------------------------------------------------------------------+---+-----+-----+
|[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"},{"col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"}]|4A |156  |36   |
|[]                                                                                                                  |4B |179  |258  |
|[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"}]                                                         |4C |222  |158  |
|[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"},{"col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"}]  |4D |222  |158  |
+--------------------------------------------------------------------------------------------------------------------+---+-----+-----+

# Splitting list of dictionaries into rows
>>> df2 = df.withColumn("data",explode(split(regexp_replace(col("DATASET"), "(^[)|(]$)", ""), ", "))).withColumn("data",explode(split('data','},'))).withColumn("data",explode(split(regexp_replace(col("data"), "(^{)|(}$)", ""), ", ")))
>>> df2 = df2.drop(df2.DATASET)
>>> df2.show(truncate=False)
+---+-----+-----+-------------------------------------------------------+
|ID |count|level|data                                                   |
+---+-----+-----+-------------------------------------------------------+
|4A |156  |36   |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|
|4A |156  |36   |"col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD" |
|4B |179  |258  |                                                       |
|4C |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|
|4D |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|
|4D |222  |158  |"col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"   |
+---+-----+-----+-------------------------------------------------------+

#Getting col.1 value from the seperated dict
>>> col_1 = F.split(df2['data'],',')
>>> df3 = df2.withColumn('col_1', col_1.getItem(0))
>>> df3.show(truncate=False)
+---+-----+-----+-------------------------------------------------------+---------------+
|ID |count|level|data                                                   |col_1          |
+---+-----+-----+-------------------------------------------------------+---------------+
|4A |156  |36   |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"col.1":"12ABC"|
|4A |156  |36   |"col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD" |"col.1":"13ABC"|
|4B |179  |258  |                                                       |               |
|4C |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"col.1":"12ABC"|
|4D |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"col.1":"12ABC"|
|4D |222  |158  |"col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"   |"col.1":"12ABC"|
+---+-----+-----+-------------------------------------------------------+---------------+

#Getting value of col.1
>>> col_1 = F.split(df3['col_1'],':')
>>> df3 =df3.withColumn('col_1',col_1.getItem(1)).drop(df3.col_1)
>>> df3.show(truncate=False)
+---+-----+-----+-------------------------------------------------------+-------+
|ID |count|level|data                                                   |col_1  |
+---+-----+-----+-------------------------------------------------------+-------+
|4A |156  |36   |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"12ABC"|
|4A |156  |36   |"col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD" |"13ABC"|
|4B |179  |258  |                                                       |null   |
|4C |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"12ABC"|
|4D |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"12ABC"|
|4D |222  |158  |"col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"   |"12ABC"|
+---+-----+-----+-------------------------------------------------------+-------+

#Grouping data by ID and col_1 columns
>>> grp_df = df3.groupBy(grouping_cols).agg(collect_list("data").name("dataset"))
>>> grp_df.show(truncate=False)
+-------+---+---------------------------------------------------------------------------------------------------------------+
|col_1  |ID |dataset                                                                                                        |
+-------+---+---------------------------------------------------------------------------------------------------------------+
|"13ABC"|4A |["col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"]                                                       |
|null   |4B |[]                                                                                                             |
|"12ABC"|4D |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD", "col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"]|
|"12ABC"|4A |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|"12ABC"|4C |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
+-------+---+---------------------------------------------------------------------------------------------------------------+
>>> grp_df = grp_df.selectExpr("col_1 as grp_col", "ID as ID","dataset as dataset")
>>> grp_df.show(truncate=False)
+-------+---+---------------------------------------------------------------------------------------------------------------+
|grp_col|ID |dataset                                                                                                        |
+-------+---+---------------------------------------------------------------------------------------------------------------+
|"13ABC"|4A |["col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"]                                                       |
|null   |4B |[]                                                                                                             |
|"12ABC"|4D |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD", "col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"]|
|"12ABC"|4A |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|"12ABC"|4C |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
+-------+---+---------------------------------------------------------------------------------------------------------------+

# joining grouped df with main dataframe and removing dupes
>>> final_df = df3.join(grp_df,'ID').dropDuplicates().drop('data').drop('col_1')
>>> final_df.show(truncate=False)
+---+-----+-----+-------+---------------------------------------------------------------------------------------------------------------+
|ID |count|level|grp_col|dataset                                                                                                        |
+---+-----+-----+-------+---------------------------------------------------------------------------------------------------------------+
|4B |179  |258  |null   |[]                                                                                                             |
|4C |222  |158  |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|4A |156  |36   |"13ABC"|["col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"]                                                       |
|4A |156  |36   |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|4A |156  |36   |"13ABC"|["col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"]                                                       |
|4A |156  |36   |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|4D |222  |158  |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD", "col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"]|
|4D |222  |158  |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD", "col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"]|
+---+-----+-----+-------+---------------------------------------------------------------------------------------------------------------+

我们可以使用 posexplode 和 concat_ws 选项代替正则表达式。

df2.select(
"*",
F.posexplode(F.split("DATASET", ",")).alias("pos", "token")
)
.where("pos > 0")
.groupBy("ID", "DATASET")
.agg(F.concat_ws("_" ,F.collect_list("token")).alias("data_cols"))
.select(
"ID",
F.split("DATASET", ",").getItem(0).alias("col_id"),
"data"
)
.show(truncate=False)

最新更新