Spark Scala:从数据帧中删除连续(按日期)重复记录



问题是关于使用数据帧,我想删除完全重复的记录,不包括某些字段(日期(。 我尝试使用窗口函数(WindowSpec(作为:

val wFromDupl: WindowSpec = Window
.partitionBy(comparateFields: _*)
.orderBy(asc(orderField))

在变量comparateFields中,我存储了我必须检查的所有字段(在示例中,它将是DESC1和DESC2(以消除重复项,其逻辑是,如果有重复的记录,我们将丢弃日期较高的记录。

在 orderField 变量中,我只是存储effective_date字段。

因此,通过应用 window 函数,我所做的是计算一个临时列,为所有重复的记录分配最小日期,然后将 dataFrame 过滤为:

val dfFinal: DataFrame = dfInicial
.withColumn("w_eff_date", min(col("effective_date")).over(wFromDupl))
.filter(col("effective_date") === col("w_eff_date")) 
.drop("w_eff_date")
.distinct()
.withColumn("effective_end_date", lead(orderField, 1, "9999-12-31").over(w))

对于以下情况,它可以正常工作:

KEY EFFECTIVE_DATE  DESC 1  DESC 2  W_EFF_DATE (tmp)
E2  2000            A       B       2000
E2  2001            A       B       2000
E2  2002            AA      B       2002

代码将删除第二条记录:

E2  2001            A       B       2000

但是逻辑必须应用于连续记录(日期(,例如,对于以下情况,在实现代码时,我们正在删除第三条记录(DESC1 和 DESC2 相同,最小有效日期是 2000(,但我们不希望这样做,因为我们(eff_date(中间有一条记录(2001 AA B(所以我们想保留 3 条记录

KEY EFFECTIVE_DATE  DESC1   DESC2   W_EFF_DATE (tmp)
E1     2000         A       B       2000
E1     2001         AA      B       2001
E1     2002         A       B       2000

对此有什么建议吗? 谢谢大家!

一种方法是使用when/otherwise和 Window 函数lag来确定要保留哪些行,如下所示:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
("E1", "2000", "A",  "B"),
("E1", "2001", "AA", "B"),
("E1", "2002", "A",  "B"),
("E1", "2003", "A",  "B"),
("E1", "2004", "A",  "B"),
("E2", "2000", "C",  "D"),
("E2", "2001", "C",  "D"),
("E2", "2002", "CC", "D"),
("E2", "2003", "C",  "D")
).toDF("key", "effective_date", "desc1", "desc2")
val compareCols = List("desc1", "desc2")
val win1 = Window.partitionBy("key").orderBy("effective_date")
val df2 = df.
withColumn("compCols", struct(compareCols.map(col): _*)).
withColumn("rowNum", row_number.over(win1)).
withColumn("toKeep",
when($"rowNum" === 1 || $"compCols" =!= lag($"compCols", 1).over(win1), true).
otherwise(false)
)
// +---+--------------+-----+-----+--------+------+------+
// |key|effective_date|desc1|desc2|compCols|rowNum|toKeep|
// +---+--------------+-----+-----+--------+------+------+
// | E1|          2000|    A|    B|   [A,B]|     1|  true|
// | E1|          2001|   AA|    B|  [AA,B]|     2|  true|
// | E1|          2002|    A|    B|   [A,B]|     3|  true|
// | E1|          2003|    A|    B|   [A,B]|     4| false|
// | E1|          2004|    A|    B|   [A,B]|     5| false|
// | E2|          2000|    C|    D|   [C,D]|     1|  true|
// | E2|          2001|    C|    D|   [C,D]|     2| false|
// | E2|          2002|   CC|    D|  [CC,D]|     3|  true|
// | E2|          2003|    C|    D|   [C,D]|     4|  true|
// +---+--------------+-----+-----+--------+------+------+
df2.where($"toKeep").select(df.columns.map(col): _*).
show
// +---+--------------+-----+-----+
// |key|effective_date|desc1|desc2|
// +---+--------------+-----+-----+
// | E1|          2000|    A|    B|
// | E1|          2001|   AA|    B|
// | E1|          2002|    A|    B|
// | E2|          2000|    C|    D|
// | E2|          2002|   CC|    D|
// | E2|          2003|    C|    D|
// +---+--------------+-----+-----+

最新更新