我想根据在给定分钟内观看的秒数计算分钟获胜者。下面是start_timestamp和end_timestamp的示例。最后,我需要将分钟分配给该分钟中秒数最高的频道。
start_timestamp | end_timestamp | Channel | 7:59:45 | 8:00:09 |
---|---|---|
8:00:10 | 8:00:14 td> | |
8:00:15 | 8:00:29 | |
8:00:30 | 8:02:10 | C |
只是一个警告—这可能是一个性能非常差的解决方案,这取决于每一行的时间差有多长。其思想是生成以秒为单位的时间戳序列,并使用分组按窗口计数。
val df2 = df.selectExpr(
"Channel",
"explode(sequence(timestamp(start_timestamp), timestamp(end_timestamp), interval 1 second)) time"
).groupBy(window($"time", "1 minute"), $"Channel")
.count()
.groupBy("window")
.agg(max(struct("count", "Channel")).as("s"))
.select("window.*", "s.Channel")
.orderBy("window")
df2.show(false)
+-------------------+-------------------+-------+
|start |end |Channel|
+-------------------+-------------------+-------+
|2021-01-19 07:59:00|2021-01-19 08:00:00|A |
|2021-01-19 08:00:00|2021-01-19 08:01:00|C |
|2021-01-19 08:01:00|2021-01-19 08:02:00|C |
|2021-01-19 08:02:00|2021-01-19 08:03:00|C |
+-------------------+-------------------+-------+