使用多个列的火花窗口函数



我有这个数据帧,显示每个用户的发送时间和打开时间:

val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),             
("user2", "2018-04-05 18:00:00", null),
("user2", "2018-04-05 19:00:00", null)              
).toDF("id", "sendTime", "openTime")

+-----+-------------------+-------------------+
|   id|           sendTime|           openTime|
+-----+-------------------+-------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
|user2|2018-04-05 18:00:00|               null|
|user2|2018-04-05 19:00:00|               null|
+-----+-------------------+-------------------+

现在,我想计算每个用户每次发送时间在过去两个小时内发生的打开次数。我使用窗口函数按用户进行分区,但我不知道如何将sendTime列的值与openTime列进行比较。结果数据帧应如下所示:

+-----+-------------------+-------------------+-----+
|   id|           sendTime|           openTime|count|
+-----+-------------------+-------------------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|    2|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|    2|
|user2|2018-04-05 18:00:00|               null|    3|
|user2|2018-04-05 19:00:00|               null|    2|
+-----+-------------------+-------------------+-----+

这是我所得到的,但没有给我我需要的东西:

var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
df2 = df2.withColumn("count", F.count($"openUnix").over(w))

仅使用Window函数似乎很难,因为在尝试从openTime中导出值是否在上限sendTime的最后 2 小时内时,您无法引用sendTime的上限。

随着 Spark 2.4 的出现,您可以在此处阅读更高阶函数(https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html)。使用这些函数,您可以使用collect_list函数收集窗口内的所有openTime,然后使用高阶函数filter过滤掉sendTime前两小时之外的openTimes。最后,您可以计算列表中剩余的值,以提供所需的计数。这是我执行此操作的代码。

import org.apache.spark.sql.expressions.Window
val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),             
("user2", "2018-04-05 18:00:00", null),
("user2", "2018-04-05 19:00:00", null)              
).toDF("id", "sendTime", "openTime")
var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
.withColumn("openUnix", unix_timestamp($"openTime"))
val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))
df3.show(false)
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|id   |sendTime           |openTime           |sendUnix  |openUnix  |opened                              |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800]                        |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800]            |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800]                        |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800]            |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 18:00:00|null               |1522947600|null      |[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 19:00:00|null               |1522951200|null      |[1522946400, 1522947000]            |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
"size(filter(opened, x -> x < sendUnix AND  x > sendUnix - 7200)) as count")
df4.show(false)
+-----+-------------------+-------------------+----------+----------+-----+
|id   |sendTime           |openTime           |sendUnix  |openUnix  |count|
+-----+-------------------+-------------------+----------+----------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0    |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1    |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2    |
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2    |
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0    |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1    |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2    |
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1    |
|user2|2018-04-05 18:00:00|null               |1522947600|null      |3    |
|user2|2018-04-05 19:00:00|null               |1522951200|null      |2    |
+-----+-------------------+-------------------+----------+----------+-----+

你去吧。

val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))

val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)
var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))
var df3 = df2.select('*, explode('list).as("prevTimeStamp"))
df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show

相关内容

  • 没有找到相关文章

最新更新