我有这个数据帧,显示每个用户的发送时间和打开时间:
val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
("user2", "2018-04-05 18:00:00", null),
("user2", "2018-04-05 19:00:00", null)
).toDF("id", "sendTime", "openTime")
+-----+-------------------+-------------------+
| id| sendTime| openTime|
+-----+-------------------+-------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
|user2|2018-04-05 18:00:00| null|
|user2|2018-04-05 19:00:00| null|
+-----+-------------------+-------------------+
现在,我想计算每个用户每次发送时间在过去两个小时内发生的打开次数。我使用窗口函数按用户进行分区,但我不知道如何将sendTime
列的值与openTime
列进行比较。结果数据帧应如下所示:
+-----+-------------------+-------------------+-----+
| id| sendTime| openTime|count|
+-----+-------------------+-------------------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00| 2|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00| 2|
|user2|2018-04-05 18:00:00| null| 3|
|user2|2018-04-05 19:00:00| null| 2|
+-----+-------------------+-------------------+-----+
这是我所得到的,但没有给我我需要的东西:
var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
df2 = df2.withColumn("count", F.count($"openUnix").over(w))
仅使用Window
函数似乎很难,因为在尝试从openTime
中导出值是否在上限sendTime
的最后 2 小时内时,您无法引用sendTime
的上限。
随着 Spark 2.4 的出现,您可以在此处阅读更高阶函数(https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html)。使用这些函数,您可以使用collect_list
函数收集窗口内的所有openTime
,然后使用高阶函数filter
过滤掉sendTime
前两小时之外的openTimes
。最后,您可以计算列表中剩余的值,以提供所需的计数。这是我执行此操作的代码。
import org.apache.spark.sql.expressions.Window
val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
("user2", "2018-04-05 18:00:00", null),
("user2", "2018-04-05 19:00:00", null)
).toDF("id", "sendTime", "openTime")
var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
.withColumn("openUnix", unix_timestamp($"openTime"))
val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))
df3.show(false)
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|id |sendTime |openTime |sendUnix |openUnix |opened |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 18:00:00|null |1522947600|null |[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 19:00:00|null |1522951200|null |[1522946400, 1522947000] |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
"size(filter(opened, x -> x < sendUnix AND x > sendUnix - 7200)) as count")
df4.show(false)
+-----+-------------------+-------------------+----------+----------+-----+
|id |sendTime |openTime |sendUnix |openUnix |count|
+-----+-------------------+-------------------+----------+----------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2 |
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1 |
|user2|2018-04-05 18:00:00|null |1522947600|null |3 |
|user2|2018-04-05 19:00:00|null |1522951200|null |2 |
+-----+-------------------+-------------------+----------+----------+-----+
你去吧。
val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))
val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)
var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))
var df3 = df2.select('*, explode('list).as("prevTimeStamp"))
df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show