我想创建一个DataFrame,它包含两个DataFrame中的所有行,并且在有重复的地方,我们只保留列的最大值的行。
例如,如果我们有两个具有相同模式的表,如下所示,我们将合并到一个表中,该表仅包括由另一列分组的行组(以下示例中的"名称"(中具有最大列值(最高分数(的行。
Table A
+--------------------------+
| name | source | score |
+--------+---------+-------+
| Finch | Acme | 62 |
| Jones | Acme | 30 |
| Lewis | Acme | 59 |
| Smith | Acme | 98 |
| Starr | Acme | 87 |
+--------+---------+-------+
Table B
+--------------------------+
| name | source | score |
+--------+---------+-------+
| Bryan | Beta | 93 |
| Jones | Beta | 75 |
| Lewis | Beta | 59 |
| Smith | Beta | 64 |
| Starr | Beta | 81 |
+--------+---------+-------+
Final Table
+--------------------------+
| name | source | score |
+--------+---------+-------+
| Bryan | Beta | 93 |
| Finch | Acme | 62 |
| Jones | Beta | 75 |
| Lewis | Acme | 59 |
| Smith | Acme | 98 |
| Starr | Acme | 87 |
+--------+---------+-------+
以下是似乎有效的:
from pyspark.sql import functions as F
schema = ["name", "source", "score"]
rows1 = [("Smith", "Acme", 98),
("Jones", "Acme", 30),
("Finch", "Acme", 62),
("Lewis", "Acme", 59),
("Starr", "Acme", 87)]
rows2 = [("Smith", "Beta", 64),
("Jones", "Beta", 75),
("Bryan", "Beta", 93),
("Lewis", "Beta", 59),
("Starr", "Beta", 81)]
df1 = spark.createDataFrame(rows1, schema)
df2 = spark.createDataFrame(rows2, schema)
df_union = df1.unionAll(df2)
df_agg = df_union.groupBy("name").agg(F.max("score").alias("score"))
df_final = df_union.join(df_agg, on="score", how="leftsemi").orderBy("name", F.col("score").desc()).dropDuplicates(["name"])
上面的结果就是我所期望的DataFrame。这似乎是一种复杂的方法,但我不知道,因为我对Spark相对陌生。这能以更高效、优雅或";Pythonic";方式
您可以使用窗口函数。按name
进行分区,然后选择score
最高的记录。
from pyspark.sql.functions import *
from pyspark.sql.window import Window
w=Window().partitionBy("name").orderBy(desc("score"))
df_union.withColumn("rank", row_number().over(w))
.filter(col("rank")==1).drop("rank").show()
+-----+------+-----+
| name|source|score|
+-----+------+-----+
|Bryan| Beta| 93|
|Finch| Acme| 62|
|Jones| Beta| 75|
|Lewis| Acme| 59|
|Smith| Acme| 98|
|Starr| Acme| 87|
+-----+------+-----+
我认为你的答案没有错,除了最后一行——你不能只在分数上加入,而是需要在"name";以及";得分";,您可以选择内部联接,这将消除删除同一名称的得分较低的行的需要:
df_final = (df_union.join(df_agg, on=["name", "score"], how="inner")
.orderBy("name")
.dropDuplicates(["name"]))
请注意,不需要按分数排序,只有当您想避免为name=Lewis显示两行时,才需要.dropDuplicates(["name"](,Lewis在两个数据帧中都有相同的分数。