在spark中,如何将窗口规范与聚合函数一起使用



我有一个spark数据帧,看起来像:

+------------+---------+---------------------------------------------------------------------------------------------------------+
|parent_key  |id       |value                                                       |raw_is_active           |updated_at         |
+------------+---------+------------------------------------------------------------+------------------------+-------------------+
|1           |2        |[, 0, USER, 2020-12-11 04:50:40, 2020-12-11 04:50:40,]      |[2020-12-11 04:50:40, 0]|2020-12-11 04:50:40|
|1           |2        |[testA, 0, USER, 2020-12-11 04:50:40, 2020-12-11 17:18:00,] |null                    |2020-12-11 17:18:00|
|1           |2        |[testA, 0, USER, 2020-12-11 04:50:40, 2020-12-11 17:19:56,] |null                    |2020-12-11 17:19:56|
|1           |2        |[testA, 1, USER, 2020-12-11 04:50:40, 2020-12-11 17:20:24,] |[2020-12-11 17:20:24, 1]|2020-12-11 17:20:24|
|2           |3        |[testB, 0, USER, 2020-12-11 17:24:03, 2020-12-11 17:24:03,] |[2020-12-11 17:24:03, 0]|2020-12-11 17:24:03|
|3           |4        |[testC, 0, USER, 2020-12-11 17:27:36, 2020-12-11 17:27:36,] |[2020-12-11 17:27:36, 0]|2020-12-11 17:27:36|
+------------+---------+------------------------------------------------------------+------------------------+-------------------+

架构为:

root
|-- parent_key: long (nullable = true)
|-- id: string (nullable = true)
|-- value: struct (nullable = true)
|    |-- first_name: string (nullable = true)
|    |-- is_active: integer (nullable = true)
|    |-- source: string (nullable = true)
|    |-- created_at: timestamp (nullable = true)
|    |-- updated_at: timestamp (nullable = true)
|-- raw_is_active: struct (nullable = true)
|    |-- updated_at: timestamp (nullable = true)
|    |-- value: integer (nullable = true)
|-- updated_at: timestamp (nullable = true)

我正在寻找一个输出:

+------------+---------+------------------------------------------------------------+---------------------------------------------------+-------------------+
|parent_key  |id       |value                                                       |raw_is_active                                      |updated_at         |
+------------+---------+---------------------------------------------------------------------------------------------------------+--------------------------+
|1           |2        |[testA, 1, USER, 2020-12-11 04:50:40, 2020-12-11 17:20:24]  |[[2020-12-11 04:50:40, 0],[2020-12-11 17:20:24, 1]]|2020-12-11 04:50:40|
|2           |3        |[testB, 0, USER, 2020-12-11 17:24:03, 2020-12-11 17:24:03]  |[2020-12-11 17:24:03, 0]                           |2020-12-11 17:24:03|
|3           |4        |[testC, 0, USER, 2020-12-11 17:27:36, 2020-12-11 17:27:36]  |[2020-12-11 17:27:36, 0]                           |2020-12-11 17:27:36|
+------------+---------+---------------------------------------------------------------------------------------------------------+--------------------------+

因此,在updated_at列的基础上,我希望保留最新的行,并且还希望为给定id的所有行创建raw_is_active的数组。

我知道我可以选择最新的value使用代码:

val windowSpec = Window.partitionBy("id").orderBy(col("updated_at").desc)
dataFrame
.withColumn("maxTS", first("updated_at").over(windowSpec))
.select("*").where(col("maxTS") === col("updated_at"))
.drop("maxTS")

但不确定如何也为raw_is_active列创建一个集合

或者我可以完全使用分组功能,比如:

dataFrame
.groupBy("parent_key", "id")
.agg(collect_list("value") as "value_list", collect_set("raw_is_active") as "active_list")
.withColumn("value", col("value_list")(size(col("value_list")).minus(1)))
.drop("value_list")

对于以上内容,我不确定

  1. .withColumn("value", col("value_list")(size(col("value_list")).minus(1)))总是会给我最新的值
  2. 考虑到collect_listcollect_set的使用,这个代码有效吗

UPDATE感谢@mck,我能够使用以下代码:

val windowSpec = Window.partitionBy("id").orderBy(col("updated_at").desc)
val windowSpecSet = Window.partitionBy("id").orderBy(col("updated_at"))
val df2 = dataFrame.withColumn(
"rn",
row_number().over(windowSpec)
).withColumn(
"active_list",
collect_set("raw_is_active").over(windowSpecSet)
).drop("raw_is_active").filter("rn = 1")

然而,该代码比我现有的代码需要更多的时间:

dataFrame
.groupBy("parent_key", "id")
.agg(collect_list("value") as "value_list", collect_set("raw_is_active") as "active_list")
.withColumn("value", col("value_list")(size(col("value_list")).minus(1)))
.drop("value_list")

我的印象是窗口函数会比groupByagg更好。

为每个id分区中的每一行分配一个row_number,并使用row_number = 1:过滤行

val windowSpec = Window.partitionBy("id").orderBy(col("updated_at").desc)
val df2 = dataFrame.withColumn(
"rn",
row_number().over(windowSpec)
).withColumn(
"active_list",
array_sort(collect_set("raw_is_active").over(windowSpec))
).drop("raw_is_active").filter("rn = 1")

最新更新