有没有一种方法可以在ApacheSpark中创建按事件时间排序的Window分区



我是Apache Spark的新手,正在与一个问题作斗争。我需要在同一频道/事件类型上查找连续事件的第一个事件。为了说明我的意思,用这个数据:

|eventtype|channel|      eventtime|
+---------+-------+---------------
Play|  1|2020-08-19 01:51:09|
Play|  1|2020-08-19 01:54:09|
Live|  1|2020-08-19 02:23:53|
Live|  2|2020-08-19 07:13:34|
Play|  1|2020-08-19 09:59:49|
Play|  1|2020-08-19 22:29:33|
Live|  2|2020-08-19 22:29:37|

我需要将其转换为:

|eventtype|channel|      eventtime|
+---------+-------+---------------
Play|  1|2020-08-19 01:51:09|
Live|  1|2020-08-19 02:23:53|
Live|  2|2020-08-19 07:13:34|
Play|  1|2020-08-19 09:59:49|
Live|  2|2020-08-19 22:29:37|

我的理解:

Window.partitionBy("channel", "eventtype").orderBy("eventtime")

它会将所有不同的通道/事件类型分组到分区中,然后抓住第一行就会产生这样的结果:

|eventtype|channel|      eventtime|
+---------+-------+---------------
Play|  1|2020-08-19 01:51:09|
Live|  1|2020-08-19 02:23:53|
Live|  2|2020-08-19 07:13:34|

有没有一种方法可以在Spark中使用Window函数或其他方法来实现这一点?

您的评估是正确的,通过";通道";以及";事件类型";将不会产生所需的输出,因为给定的CCD_;事件时间";以整洁的方式。以下是产生所需结果的一种方法:

val df = Seq(
("Play", 1, "2020-08-19 01:51:09"),
("Play", 1, "2020-08-19 01:54:09"),
("Live", 1, "2020-08-19 02:23:53"),
("Live", 2, "2020-08-19 07:13:34"),
("Play", 1, "2020-08-19 09:59:49"),
("Play", 1, "2020-08-19 22:29:33"),
("Live", 2, "2020-08-19 22:29:37")
).toDF("eventtype", "channel", "eventtime")
import org.apache.spark.sql.expressions.Window
val win = Window.orderBy("eventtime")
val keyCols = Seq(col("channel"), col("eventtype"))
df.
withColumn("row_num", row_number.over(win)).
withColumn("prev_keycols", lag(struct(keyCols: _*), 1).over(win)).
where($"row_num" === 1 || struct(keyCols: _*) =!= $"prev_keycols").
show
/*
+---------+-------+-------------------+-------+------------+
|eventtype|channel|          eventtime|row_num|prev_keycols|
+---------+-------+-------------------+-------+------------+
|     Play|      1|2020-08-19 01:51:09|      1|        null|
|     Live|      1|2020-08-19 02:23:53|      3|   {1, Play}|
|     Live|      2|2020-08-19 07:13:34|      4|   {1, Live}|
|     Play|      1|2020-08-19 09:59:49|      5|   {2, Live}|
|     Live|      2|2020-08-19 22:29:37|      7|   {1, Play}|
+---------+-------+-------------------+-------+------------+
*/

中间列保留在输出中以供引用。

您可以将window函数与row_numberfilter一起使用,如下所示

val window = Window.partitionBy("eventtype", "channel").orderBy("eventtime")
df.withColumn("rank", row_number().over(window))
.filter($"rank" === 1)
.drop("rank")
.show(false) 

输出:

+---------+-------+-------------------+
|eventtype|channel|eventtime          |
+---------+-------+-------------------+
|Live     |1      |2020-08-19 02:23:53|
|Play     |1      |2020-08-19 01:51:09|
|Play     |2      |2020-08-19 07:54:53|
|Live     |2      |2020-08-19 07:13:34|
|Play     |3      |2020-08-19 09:59:48|
+---------+-------+-------------------+

最终使用了以下内容

withColumn("prevchannel", lag("channel",1).over(window) 
and
where(startPrevValues("channel") =!= startPrevValues("prevchannel"))

查找通道更改的所有事件

我假设您希望在此处保留每个通道和事件类型的第一个事件时间。

spark中的Window函数返回窗口中的行的秩。

正如你所说的,你确实可以使用分区

val result_df = df
.withColumn("rank", rank.over(Window.partitionBy("eventtype","channel").orderBy("eventtype")
.filter(col("rank")===1)

最新更新