[皮]Spark SQL:使用框架的输入行约束窗口的每个帧



我想根据当前输入行来约束聚合函数中窗框中的哪些行。例如,给定数据框df和一个窗口w,我希望能够做类似的事情:

df2 = df.withColumn("foo", first(col("bar").filter(...)).over(w))

.filter将根据框架的输入行从当前窗框中删除行。

我的特定用例如下:给定数据框df

+-----+--+--+
|group|n1|n2| 
+-----+--+--+
|    1| 1| 6|
|    1| 0| 3|
|    1| 2| 2|
|    1| 3| 5|
|    2| 0| 5|
|    2| 0| 7|
|    2| 3| 2|
|    2| 5| 9|
+-----+--+--+

窗口

w = Window.partitionBy("group")
          .orderBy("n1", "n2")
          .rowsBetween(Window.currentRow + 1, Window.unboundedFollowing)

和一些正长的i,您如何在每个输入行r的框架中找到第一行(fr),从而使r.n1<fr.n1r.n2<fr.n2和MAX(fr.n1 -r.n1fr.n2 -r.n2)<i?返回的值可以是fr.n1df中的fr的行索引。因此,对于i = 6,示例df的输出为

+-----+--+--+-----+
|group|n1|n2|fr.n1|
+-----+--+--+-----+
|    1| 1| 6| null|
|    1| 0| 3|    1|
|    1| 2| 2|    3|
|    1| 3| 5| null|
|    2| 0| 5|    5|
|    2| 0| 7|    5|
|    2| 3| 2| null|
|    2| 5| 9| null|
+-----+--+--+-----+

我一直在研究Spark API,并首先查看窗户的示例,但我似乎无法将其拼凑在一起。这是通过窗口和聚合功能的,还是我完全脱离商标?

您将无法使用窗口函数和聚合来完成此操作,您需要一个自我加入:加入:

df = sc.parallelize([[1, 1, 6],[1, 0, 3],[1, 2, 2],[1, 3, 5],[2, 0, 5],[2, 0, 7],[2, 3, 2],[2, 5, 9]]).toDF(["group","n1","n2"])
import pyspark.sql.functions as psf
df_r = df.select([df[c].alias("r_" + c) for c in df.columns])
df_join = df_r
    .join(df, (df_r.r_group == df.group) 
        & (df_r.r_n1 < df.n1) 
        & (df_r.r_n2 < df.n2) 
        & (psf.greatest(df.n1 - df_r.r_n1, df.n2 - df_r.r_n2) < i), "leftouter")
    .drop("group")

现在我们可以应用窗口函数以保持第一行:

w = Window.partitionBy("r_group", "r_n1", "r_n2").orderBy("n1", "n2")
res = df_join
    .withColumn("rn", psf.row_number().over(w))
    .filter("rn = 1").drop("rn")
    +-------+----+----+----+----+
    |r_group|r_n1|r_n2|  n1|  n2|
    +-------+----+----+----+----+
    |      1|   0|   3|   1|   6|
    |      1|   1|   6|null|null|
    |      1|   2|   2|   3|   5|
    |      1|   3|   5|null|null|
    |      2|   0|   5|   5|   9|
    |      2|   0|   7|   5|   9|
    |      2|   3|   2|null|null|
    |      2|   5|   9|null|null|
    +-------+----+----+----+----+

相关内容

  • 没有找到相关文章

最新更新