我有一个数据集ds
,如下所示:
ds.show():
id1 | id2 | id3 | value |
1 | 1 | 2 | tom |
1 | 1 | 2 | tim |
1 | 3 | 2 | tom |
1 | 3 | 2 | tom |
2 | 1 | 2 | mary |
我想删除给定键(id1,id2,id3)
的所有重复行(即第1行和第2行(,但同时只保留一行用于具有相同值的重复行(如第3行和第4行(。预期输出为:
id1 | id2 | id3 | value |
1 | 3 | 2 | tom |
2 | 1 | 2 | mary |
在这里,我应该删除第1行和第2行,因为键组有2个值。但我们只保留第3行和第4行的一行,因为值相同(而不是删除这两行(
我尝试使用来实现这一点
val df = Seq(
(1, 1, 2, "tom"),
(1, 1, 2, "tim"),
(1, 3, 2, "tom"),
(1, 3, 2, "tom"),
(2, 1, 2, "mary")
).toDF("id1", "id2", "id3", "value")
val window = Window.partitionBy("id1", "id2", "id3")
df.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)
以下是相关问题:Spark:删除所有重复的线路
但它并没有按预期工作,因为它将删除所有重复的行。
我想这样做的原因是加入另一个数据集,当我们为同一个密钥组有多个名称时,不添加来自该数据集的信息
您可以在分组前删除重复项,这将为您提供如下所示的单个记录
df.dropDuplicates()
.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)
您还可以将要检查是否重复的字段指定为
df.dropDuplicates("id1", "id2", "id3", "value")
.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)
输出:
+---+---+---+-----+
|id1|id2|id3|value|
+---+---+---+-----+
|1 |3 |2 |tom |
|2 |1 |2 |mary |
+---+---+---+-----+
当重复时,您可以distinct
来获取唯一一行。
df.distinct
.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)
+---+---+---+-----+
|id1|id2|id3|value|
+---+---+---+-----+
|1 |3 |2 |tom |
|2 |1 |2 |mary |
+---+---+---+-----+
您也可以使用groupBy
方法。
df.groupBy("id1", "id2", "id3", "value")
.agg(first("col1").as("col1"), ...)
.withColumn("count", count("value").over(window))
.filter($"count" < 2)
.drop("count")
.show(false)