我是Apache Spark的新手,正在与一个问题作斗争。我需要在同一频道/事件类型上查找连续事件的第一个事件。为了说明我的意思,用这个数据:
|eventtype|channel| eventtime|
+---------+-------+---------------
Play| 1|2020-08-19 01:51:09|
Play| 1|2020-08-19 01:54:09|
Live| 1|2020-08-19 02:23:53|
Live| 2|2020-08-19 07:13:34|
Play| 1|2020-08-19 09:59:49|
Play| 1|2020-08-19 22:29:33|
Live| 2|2020-08-19 22:29:37|
我需要将其转换为:
|eventtype|channel| eventtime|
+---------+-------+---------------
Play| 1|2020-08-19 01:51:09|
Live| 1|2020-08-19 02:23:53|
Live| 2|2020-08-19 07:13:34|
Play| 1|2020-08-19 09:59:49|
Live| 2|2020-08-19 22:29:37|
我的理解:
Window.partitionBy("channel", "eventtype").orderBy("eventtime")
它会将所有不同的通道/事件类型分组到分区中,然后抓住第一行就会产生这样的结果:
|eventtype|channel| eventtime|
+---------+-------+---------------
Play| 1|2020-08-19 01:51:09|
Live| 1|2020-08-19 02:23:53|
Live| 2|2020-08-19 07:13:34|
有没有一种方法可以在Spark中使用Window函数或其他方法来实现这一点?
您的评估是正确的,通过";通道";以及";事件类型";将不会产生所需的输出,因为给定的CCD_;事件时间";以整洁的方式。以下是产生所需结果的一种方法:
val df = Seq(
("Play", 1, "2020-08-19 01:51:09"),
("Play", 1, "2020-08-19 01:54:09"),
("Live", 1, "2020-08-19 02:23:53"),
("Live", 2, "2020-08-19 07:13:34"),
("Play", 1, "2020-08-19 09:59:49"),
("Play", 1, "2020-08-19 22:29:33"),
("Live", 2, "2020-08-19 22:29:37")
).toDF("eventtype", "channel", "eventtime")
import org.apache.spark.sql.expressions.Window
val win = Window.orderBy("eventtime")
val keyCols = Seq(col("channel"), col("eventtype"))
df.
withColumn("row_num", row_number.over(win)).
withColumn("prev_keycols", lag(struct(keyCols: _*), 1).over(win)).
where($"row_num" === 1 || struct(keyCols: _*) =!= $"prev_keycols").
show
/*
+---------+-------+-------------------+-------+------------+
|eventtype|channel| eventtime|row_num|prev_keycols|
+---------+-------+-------------------+-------+------------+
| Play| 1|2020-08-19 01:51:09| 1| null|
| Live| 1|2020-08-19 02:23:53| 3| {1, Play}|
| Live| 2|2020-08-19 07:13:34| 4| {1, Live}|
| Play| 1|2020-08-19 09:59:49| 5| {2, Live}|
| Live| 2|2020-08-19 22:29:37| 7| {1, Play}|
+---------+-------+-------------------+-------+------------+
*/
中间列保留在输出中以供引用。
您可以将window
函数与row_number
和filter
一起使用,如下所示
val window = Window.partitionBy("eventtype", "channel").orderBy("eventtime")
df.withColumn("rank", row_number().over(window))
.filter($"rank" === 1)
.drop("rank")
.show(false)
输出:
+---------+-------+-------------------+
|eventtype|channel|eventtime |
+---------+-------+-------------------+
|Live |1 |2020-08-19 02:23:53|
|Play |1 |2020-08-19 01:51:09|
|Play |2 |2020-08-19 07:54:53|
|Live |2 |2020-08-19 07:13:34|
|Play |3 |2020-08-19 09:59:48|
+---------+-------+-------------------+
最终使用了以下内容
withColumn("prevchannel", lag("channel",1).over(window)
and
where(startPrevValues("channel") =!= startPrevValues("prevchannel"))
查找通道更改的所有事件
我假设您希望在此处保留每个通道和事件类型的第一个事件时间。
spark中的Window函数返回窗口中的行的秩。
正如你所说的,你确实可以使用分区
val result_df = df
.withColumn("rank", rank.over(Window.partitionBy("eventtype","channel").orderBy("eventtype")
.filter(col("rank")===1)