删除spark dataframe中一系列行的最新日期



我有以下数据框架:

+----------+----------+--------------------+--------------------+---------+
|  fs_date |   ss_date|       request      |            response|full_date|
+----------+----------+--------------------+--------------------+---------+
|2022-06-01|2022-06-02|[[[TLV, NYC, 2022...|[[[false, [1262.1...|2022-5-25|
|2022-06-01|2022-06-03|[[[TLV, NYC, 2022...|[[[false, [1226.6...|2022-5-28|
|2022-06-01|2022-06-03|[[[TLV, NYC, 2022...|[[[false, [3746.6...|2022-5-28|
|2022-06-01|2022-06-04|[[[TLV, NYC, 2022...|[[[false, [878.63...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [777.81...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [746.58...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [777.81...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [695.28...|2022-5-26|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [593.63...|2022-5-25|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.28...|2022-5-29|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.28...|2022-5-28|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.28...|2022-5-28|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.38...|2022-5-26|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [789.88...|2022-5-25|

对于每个日期组合,我只想得到最新的回复(按"完整日期")。

例如,对于|2022-06-01|2022-06-05|我只想要来自2022-5-29的响应。

for 2022-06-01|2022-06-03 only 2022-5-28,等等

预期输出:

+----------+----------+--------------------+--------------------+---------+
|  fs_date |   ss_date|       request      |            response|full_date|
+----------+----------+--------------------+--------------------+---------+
|2022-06-01|2022-06-02|[[[TLV, NYC, 2022...|[[[false, [1262.1...|2022-5-25|
|2022-06-01|2022-06-03|[[[TLV, NYC, 2022...|[[[false, [1226.6...|2022-5-28|
|2022-06-01|2022-06-03|[[[TLV, NYC, 2022...|[[[false, [3746.6...|2022-5-28|
|2022-06-01|2022-06-04|[[[TLV, NYC, 2022...|[[[false, [878.63...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [777.81...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [746.58...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [777.81...|2022-5-29|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.28...|2022-5-29|

谢谢!

这是一个很好的例子,说明了什么时候使用窗口函数——一个可以在聚合中操作的函数。

date_window = W.partitionBy(["fs_date", "ss_date"]).orderBy(F.col("full_date").desc())
df2 = (
df.withColumn("row", F.dense_rank().over(date_window))
.filter(F.col("row") == 1).drop("row")
)

我们创建一个分区来定义我们将要使用的窗口,然后我们执行dense_rank(这将对full_date中的值进行排序,同时在有平局的地方重复数字),然后我们过滤排名最高的行并删除我们的窗口列。

假设您的表称为data,其想法是首先创建另一个表,我们按fs_datess_date分组,然后在full_date上最大聚合。这样我们就得到了full_date值最大的所有行:

val otherOne = data.groupBy("fs_date", "ss_date").agg(max("full_date").as("full_date"))

在此步骤之后,我们再次与主表inner连接(因此我们可以过滤掉不需要的行),如:

data.join(otherOne, Seq("fs_date", "ss_date", "full_date"), "inner")
.orderBy("fs_date", "ss_date") // for the sake of matching results

这应该给你你想要的!解决方案是在Scala中完成的,但我希望你能明白。

输出的样例结果(也有样例输入数据):

+----------+----------+---------+-------+--------+
|   fs_date|   ss_date|full_date|request|response|
+----------+----------+---------+-------+--------+
|2022-06-01|2022-06-03|2022-5-28|      R|       R|
|2022-06-01|2022-06-03|2022-5-28|      R|       R|
|2022-06-01|2022-06-04|2022-5-29|      R|       R|
|2022-06-01|2022-06-05|2022-5-29|      R|       R|
|2022-06-01|2022-06-05|2022-5-29|      R|       R|
|2022-06-01|2022-06-05|2022-5-29|      R|       R|
|2022-06-01|2022-06-06|2022-5-29|      R|       R|
+----------+----------+---------+-------+--------+

最新更新