问题是关于使用数据帧,我想删除完全重复的记录,不包括某些字段(日期(。 我尝试使用窗口函数(WindowSpec(作为:
val wFromDupl: WindowSpec = Window
.partitionBy(comparateFields: _*)
.orderBy(asc(orderField))
在变量comparateFields中,我存储了我必须检查的所有字段(在示例中,它将是DESC1和DESC2(以消除重复项,其逻辑是,如果有重复的记录,我们将丢弃日期较高的记录。
在 orderField 变量中,我只是存储effective_date字段。
因此,通过应用 window 函数,我所做的是计算一个临时列,为所有重复的记录分配最小日期,然后将 dataFrame 过滤为:
val dfFinal: DataFrame = dfInicial
.withColumn("w_eff_date", min(col("effective_date")).over(wFromDupl))
.filter(col("effective_date") === col("w_eff_date"))
.drop("w_eff_date")
.distinct()
.withColumn("effective_end_date", lead(orderField, 1, "9999-12-31").over(w))
对于以下情况,它可以正常工作:
KEY EFFECTIVE_DATE DESC 1 DESC 2 W_EFF_DATE (tmp)
E2 2000 A B 2000
E2 2001 A B 2000
E2 2002 AA B 2002
代码将删除第二条记录:
E2 2001 A B 2000
但是逻辑必须应用于连续记录(日期(,例如,对于以下情况,在实现代码时,我们正在删除第三条记录(DESC1 和 DESC2 相同,最小有效日期是 2000(,但我们不希望这样做,因为我们(eff_date(中间有一条记录(2001 AA B(所以我们想保留 3 条记录
KEY EFFECTIVE_DATE DESC1 DESC2 W_EFF_DATE (tmp)
E1 2000 A B 2000
E1 2001 AA B 2001
E1 2002 A B 2000
对此有什么建议吗? 谢谢大家!
一种方法是使用when/otherwise
和 Window 函数lag
来确定要保留哪些行,如下所示:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
("E1", "2000", "A", "B"),
("E1", "2001", "AA", "B"),
("E1", "2002", "A", "B"),
("E1", "2003", "A", "B"),
("E1", "2004", "A", "B"),
("E2", "2000", "C", "D"),
("E2", "2001", "C", "D"),
("E2", "2002", "CC", "D"),
("E2", "2003", "C", "D")
).toDF("key", "effective_date", "desc1", "desc2")
val compareCols = List("desc1", "desc2")
val win1 = Window.partitionBy("key").orderBy("effective_date")
val df2 = df.
withColumn("compCols", struct(compareCols.map(col): _*)).
withColumn("rowNum", row_number.over(win1)).
withColumn("toKeep",
when($"rowNum" === 1 || $"compCols" =!= lag($"compCols", 1).over(win1), true).
otherwise(false)
)
// +---+--------------+-----+-----+--------+------+------+
// |key|effective_date|desc1|desc2|compCols|rowNum|toKeep|
// +---+--------------+-----+-----+--------+------+------+
// | E1| 2000| A| B| [A,B]| 1| true|
// | E1| 2001| AA| B| [AA,B]| 2| true|
// | E1| 2002| A| B| [A,B]| 3| true|
// | E1| 2003| A| B| [A,B]| 4| false|
// | E1| 2004| A| B| [A,B]| 5| false|
// | E2| 2000| C| D| [C,D]| 1| true|
// | E2| 2001| C| D| [C,D]| 2| false|
// | E2| 2002| CC| D| [CC,D]| 3| true|
// | E2| 2003| C| D| [C,D]| 4| true|
// +---+--------------+-----+-----+--------+------+------+
df2.where($"toKeep").select(df.columns.map(col): _*).
show
// +---+--------------+-----+-----+
// |key|effective_date|desc1|desc2|
// +---+--------------+-----+-----+
// | E1| 2000| A| B|
// | E1| 2001| AA| B|
// | E1| 2002| A| B|
// | E2| 2000| C| D|
// | E2| 2002| CC| D|
// | E2| 2003| C| D|
// +---+--------------+-----+-----+