我想根据当前输入行来约束聚合函数中窗框中的哪些行。例如,给定数据框df
和一个窗口w
,我希望能够做类似的事情:
df2 = df.withColumn("foo", first(col("bar").filter(...)).over(w))
.filter
将根据框架的输入行从当前窗框中删除行。
我的特定用例如下:给定数据框df
+-----+--+--+
|group|n1|n2|
+-----+--+--+
| 1| 1| 6|
| 1| 0| 3|
| 1| 2| 2|
| 1| 3| 5|
| 2| 0| 5|
| 2| 0| 7|
| 2| 3| 2|
| 2| 5| 9|
+-----+--+--+
窗口
w = Window.partitionBy("group")
.orderBy("n1", "n2")
.rowsBetween(Window.currentRow + 1, Window.unboundedFollowing)
和一些正长的i
,您如何在每个输入行r
的框架中找到第一行(fr
),从而使r.n1
<fr.n1
,r.n2
<fr.n2
和MAX(fr.n1
-r.n1
,fr.n2
-r.n2
)<i
?返回的值可以是fr.n1
或df
中的fr
的行索引。因此,对于i
= 6,示例df
的输出为
+-----+--+--+-----+
|group|n1|n2|fr.n1|
+-----+--+--+-----+
| 1| 1| 6| null|
| 1| 0| 3| 1|
| 1| 2| 2| 3|
| 1| 3| 5| null|
| 2| 0| 5| 5|
| 2| 0| 7| 5|
| 2| 3| 2| null|
| 2| 5| 9| null|
+-----+--+--+-----+
我一直在研究Spark API,并首先查看窗户的示例,但我似乎无法将其拼凑在一起。这是通过窗口和聚合功能的,还是我完全脱离商标?
您将无法使用窗口函数和聚合来完成此操作,您需要一个自我加入:加入:
df = sc.parallelize([[1, 1, 6],[1, 0, 3],[1, 2, 2],[1, 3, 5],[2, 0, 5],[2, 0, 7],[2, 3, 2],[2, 5, 9]]).toDF(["group","n1","n2"])
import pyspark.sql.functions as psf
df_r = df.select([df[c].alias("r_" + c) for c in df.columns])
df_join = df_r
.join(df, (df_r.r_group == df.group)
& (df_r.r_n1 < df.n1)
& (df_r.r_n2 < df.n2)
& (psf.greatest(df.n1 - df_r.r_n1, df.n2 - df_r.r_n2) < i), "leftouter")
.drop("group")
现在我们可以应用窗口函数以保持第一行:
w = Window.partitionBy("r_group", "r_n1", "r_n2").orderBy("n1", "n2")
res = df_join
.withColumn("rn", psf.row_number().over(w))
.filter("rn = 1").drop("rn")
+-------+----+----+----+----+
|r_group|r_n1|r_n2| n1| n2|
+-------+----+----+----+----+
| 1| 0| 3| 1| 6|
| 1| 1| 6|null|null|
| 1| 2| 2| 3| 5|
| 1| 3| 5|null|null|
| 2| 0| 5| 5| 9|
| 2| 0| 7| 5| 9|
| 2| 3| 2|null|null|
| 2| 5| 9|null|null|
+-------+----+----+----+----+