我有按user_id和时间戳排序的数据帧,它看起来如下:
user_id | 时间戳 |
---|---|
1 | 1661941403 |
使用Spark sql,解决方案非常简单:
// Generate sample data
val df = Seq(
(1,1661941403),
(1,1661941408),
(1,1661941412),
(1,1661962245)).toDF("user_id","timestamp")
+-------+----------+
|user_id| timestamp|
+-------+----------+
| 1|1661941403|
| 1|1661941408|
| 1|1661941412|
| 1|1661962245|
+-------+----------+
// We'll use lag() analytic function, and for that we declare our window and period against which to compare.
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("user_id").orderBy("timestamp")
val five_minutes = lit(5*60)
// If timestamp - prev(timestamp) >= 5 min then the session is new.
// Cumulative sum of all session starts will be our session_id.
df.withColumn(
"session_is_new",
when(
coalesce(
col("timestamp") - lag("timestamp",1).over(w),
five_minutes
) >= five_minutes,
1
).otherwise(0)
).withColumn(
"session_id",
sum("session_is_new").over(w)
).show
+-------+----------+-----------+----------+
|user_id| timestamp|session_new|session_id|
+-------+----------+-----------+----------+
| 1|1661941403| 1| 1|
| 1|1661941408| 0| 1|
| 1|1661941412| 0| 1|
| 1|1661962245| 1| 2|
+-------+----------+-----------+----------+