PySpark:过滤并集的重复项,只保留指定列的最大值为groupby行



我想创建一个DataFrame,它包含两个DataFrame中的所有行,并且在有重复的地方,我们只保留列的最大值的行。

例如,如果我们有两个具有相同模式的表,如下所示,我们将合并到一个表中,该表仅包括由另一列分组的行组(以下示例中的"名称"(中具有最大列值(最高分数(的行。

Table A
+--------------------------+
| name   | source  | score |
+--------+---------+-------+
| Finch  | Acme    | 62    |
| Jones  | Acme    | 30    |
| Lewis  | Acme    | 59    |
| Smith  | Acme    | 98    |
| Starr  | Acme    | 87    |
+--------+---------+-------+

Table B
+--------------------------+
| name   | source  | score |
+--------+---------+-------+
| Bryan  | Beta    | 93    |
| Jones  | Beta    | 75    |
| Lewis  | Beta    | 59    |
| Smith  | Beta    | 64    |
| Starr  | Beta    | 81    |
+--------+---------+-------+

Final Table
+--------------------------+
| name   | source  | score |
+--------+---------+-------+
| Bryan  | Beta    | 93    |
| Finch  | Acme    | 62    |
| Jones  | Beta    | 75    |
| Lewis  | Acme    | 59    |
| Smith  | Acme    | 98    |
| Starr  | Acme    | 87    |
+--------+---------+-------+

以下是似乎有效的:

from pyspark.sql import functions as F
schema = ["name", "source", "score"]
rows1 = [("Smith", "Acme", 98),
("Jones", "Acme", 30),
("Finch", "Acme", 62),
("Lewis", "Acme", 59),
("Starr", "Acme", 87)]
rows2 = [("Smith", "Beta", 64),
("Jones", "Beta", 75),
("Bryan", "Beta", 93),
("Lewis", "Beta", 59),
("Starr", "Beta", 81)]
df1 = spark.createDataFrame(rows1, schema)
df2 = spark.createDataFrame(rows2, schema)
df_union = df1.unionAll(df2)
df_agg = df_union.groupBy("name").agg(F.max("score").alias("score"))
df_final = df_union.join(df_agg, on="score", how="leftsemi").orderBy("name", F.col("score").desc()).dropDuplicates(["name"])

上面的结果就是我所期望的DataFrame。这似乎是一种复杂的方法,但我不知道,因为我对Spark相对陌生。这能以更高效、优雅或";Pythonic";方式

您可以使用窗口函数。按name进行分区,然后选择score最高的记录。

from pyspark.sql.functions import *
from pyspark.sql.window import Window
w=Window().partitionBy("name").orderBy(desc("score"))
df_union.withColumn("rank", row_number().over(w))
.filter(col("rank")==1).drop("rank").show()
+-----+------+-----+                                                            
| name|source|score|
+-----+------+-----+
|Bryan|  Beta|   93|
|Finch|  Acme|   62|
|Jones|  Beta|   75|
|Lewis|  Acme|   59|
|Smith|  Acme|   98|
|Starr|  Acme|   87|
+-----+------+-----+

我认为你的答案没有错,除了最后一行——你不能只在分数上加入,而是需要在"name";以及";得分";,您可以选择内部联接,这将消除删除同一名称的得分较低的行的需要:

df_final = (df_union.join(df_agg, on=["name", "score"], how="inner")
.orderBy("name")
.dropDuplicates(["name"]))

请注意,不需要按分数排序,只有当您想避免为name=Lewis显示两行时,才需要.dropDuplicates(["name"](,Lewis在两个数据帧中都有相同的分数。

最新更新