Spark Dataframe |合并多个缺少值的行



我有一个数据帧,其中一列是字符串列表,另一列包含年份。有几行缺少年份列的值


字段
年份
2020 IFDSDEP.7
IFDSDEP.7、IFDSIMP.51、IFDSIMP.52、IFDSIMP.54、IFDSIM.60
2020 IFDSIMP.7、IFDSIMP.14、IFDSIMP.51、IFDSIMP.52、IFDSIM.54

如果字段列是一个字符串,您可能应该首先将该字符串拆分为一个字符串数组,这样您就可以组合成一个唯一的列表,然后将它们全部连接起来。

关于年份列中的null,您必须填充缺失的值。如果有多个年份,你需要找到一种方法来知道应该填写哪一年。

一旦你做到了,groupBy就应该做到了。

# Your example DataFrame
df: DataFrame = spark.createDataFrame(data=[
[2020, "IFDSDEP.7"],
[None, "IFDSDEP.7,IFDSIMP.51,IFDSIMP.52,IFDSIMP.54,IFDSIMP.60"],
[2020, "IFDSIMP.7,IFDSIMP.14,IFDSIMP.51,IFDSIMP.52,IFDSIMP.54"],
], schema=StructType([
StructField("year", IntegerType()),
StructField("fields", StringType())
])).cache()
df.show(truncate=False)
+----+-----------------------------------------------------+
|year|fields                                               |
+----+-----------------------------------------------------+
|2020|IFDSDEP.7                                            |
|null|IFDSDEP.7,IFDSIMP.51,IFDSIMP.52,IFDSIMP.54,IFDSIMP.60|
|2020|IFDSIMP.7,IFDSIMP.14,IFDSIMP.51,IFDSIMP.52,IFDSIMP.54|
+----+-----------------------------------------------------+
# Replace null with 2020 for `year` column
df = df.fillna({"year": 2020})
df.show(truncate=False)
+----+-----------------------------------------------------+
|year|fields                                               |
+----+-----------------------------------------------------+
|2020|IFDSDEP.7                                            |
|2020|IFDSDEP.7,IFDSIMP.51,IFDSIMP.52,IFDSIMP.54,IFDSIMP.60|
|2020|IFDSIMP.7,IFDSIMP.14,IFDSIMP.51,IFDSIMP.52,IFDSIMP.54|
+----+-----------------------------------------------------+
# Transforming the fields column
df = df.withColumn("fields", F.split(F.col("fields"), ","))
df.show(truncate=False)
+----+-----------------------------------------------------------+
|year|fields                                                     |
+----+-----------------------------------------------------------+
|2020|[IFDSDEP.7]                                                |
|2020|[IFDSDEP.7, IFDSIMP.51, IFDSIMP.52, IFDSIMP.54, IFDSIMP.60]|
|2020|[IFDSIMP.7, IFDSIMP.14, IFDSIMP.51, IFDSIMP.52, IFDSIMP.54]|
+----+-----------------------------------------------------------+
# Aggregate on year and collect all arrays of fields then combine them all and make them distinct
df_agg = df.groupby("year").agg(F.array_distinct(F.flatten(F.collect_list("fields"))))
df.show(truncate=False)
+----+----------------------------------------------------------------------------------+
|year|array_distinct(flatten(collect_list(fields)))                                     |
+----+----------------------------------------------------------------------------------+
|2020|[IFDSDEP.7, IFDSIMP.51, IFDSIMP.52, IFDSIMP.54, IFDSIMP.60, IFDSIMP.7, IFDSIMP.14]|
+----+----------------------------------------------------------------------------------+

分解代码的最后一部分:

F.collect_list("fields"(-按键(年份(收集组的所有字段数组现在你应该有一个阵列

[[IFDSDEP.7], [IFDSDEP.7, IFDSIMP.51, IFDSIMP.52, IFDSIMP.54, IFDSIMP.60], [IFDSIMP.7, IFDSIMP.14, IFDSIMP.51, IFDSIMP.52, IFDSIMP.54]]

F.flatn((-此函数将子阵列展开为一个大阵列

[IFDSDEP.7, IFDSDEP.7, IFDSIMP.51, IFDSIMP.52, IFDSIMP.54, IFDSIMP.60, IFDSIMP.7, IFDSIMP.14, IFDSIMP.51, IFDSIMP.52, IFDSIMP.54]

F.array_dispinct((-此函数会消除数组中的值的重复,从而产生您期望的

[IFDSDEP.7, IFDSIMP.51, IFDSIMP.52, IFDSIMP.54, IFDSIMP.60, IFDSIMP.7, IFDSIMP.14]

最新更新