使用自定义逻辑为每一行数据帧生成唯一的会话id



我有按user_id和时间戳排序的数据帧,它看起来如下:

user_id 时间戳
1 1661941403

使用Spark sql,解决方案非常简单:

// Generate sample data
val df = Seq(
(1,1661941403),
(1,1661941408),
(1,1661941412),
(1,1661962245)).toDF("user_id","timestamp")
+-------+----------+
|user_id| timestamp|
+-------+----------+
|      1|1661941403|
|      1|1661941408|
|      1|1661941412|
|      1|1661962245|
+-------+----------+
// We'll use lag() analytic function, and for that we declare our window and period against which to compare.
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("user_id").orderBy("timestamp")
val five_minutes = lit(5*60)
// If timestamp - prev(timestamp) >= 5 min then the session is new.
// Cumulative sum of all session starts will be our session_id.
df.withColumn(
"session_is_new", 
when(
coalesce(
col("timestamp") - lag("timestamp",1).over(w), 
five_minutes
) >= five_minutes, 
1
).otherwise(0)
).withColumn(
"session_id", 
sum("session_is_new").over(w)
).show
+-------+----------+-----------+----------+
|user_id| timestamp|session_new|session_id|
+-------+----------+-----------+----------+
|      1|1661941403|          1|         1|
|      1|1661941408|          0|         1|
|      1|1661941412|          0|         1|
|      1|1661962245|          1|         2|
+-------+----------+-----------+----------+

相关内容

最新更新