Spark在首次登录后24小时内标记重复用户登录



我有一个包含用户和登录时间的数据集。如果在首次登录后的24小时内有/其他登录,我需要标记重复。"活动"窗口打开,用户登录。例如,这里是样本数据集

用户登录-----------------------------用户1 12/1/19 8:00用户1 12/1/19 10:00用户1 12/1/19 23:00用户1 12/2/19 7:00用户1 12/2/19 8:00用户1 12/2/19 10:00用户1 12/3/19 23:00用户1 12/4/19 7:00用户2 12/4/19 8:00用户2 12/5/19 5:00user2 12/6/19 0:00

预期结果

用户登录重复---------------------------------user1 12/1/19 8:00 N这是user1的第一次登录-24小时窗口在此打开user1 12/1/19 10:00 Y,因为这是在24小时内用户1 12/1/19 23:00 Y用户1 12/2/19 7:00 Y用户1 12/2/19 8:00 Yuser1 12/2/19 10:00 N此活动时间大于(上一个窗口打开+24小时(。上一个窗口关闭,第二个窗口在此打开用户1 12/3/19 23:00 N用户1 12/4/19 7:00 Y用户2 12/4/19 8:00 N用户2 12/5/19 5:00 Yuser2 12/6/19 0:00 N

我查看了具有复杂条件的Spark SQL窗口函数,但如果用户登录间隔固定(比如说每18小时一次(,则此解决方案将不起作用。

这里是另一个例子(如果解决方案只考虑计算24小时窗口的第一个活动,它将为下面的记录#7给出不正确的结果(不重复((

user1 12/1/19 8:00 N这是user1的首次登录-24小时窗口在此打开user1 12/1/19 10:00 Y,因为这是在24小时内用户1 12/1/19 23:00 Y用户1 12/2/19 7:00 Y用户1 12/2/19 8:00 Yuser1 12/2/19 10:00 N此活动时间大于(上一个窗口打开+24小时(。上一个窗口关闭,第二个窗口在此打开**用户1 12/3/19 09:00 N**用户1 12/3/19 23:00 N用户1 12/4/19 7:00 Y用户2 12/4/19 8:00 N用户2 12/5/19 5:00 Yuser2 12/6/19 0:00 N

我不知道有任何内置的Spark函数可以根据上一个会话以动态方式结束的位置连续识别下一个24小时会话(或任何给定时间段(的开始。处理这种需求的一种方法是通过一个UDF来利用Scala的fold函数:

def dupeFlags(tLimit: Long) = udf{ (logins: Seq[String], tsDiffs: Seq[Long]) =>
val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>
if (ts == 0 || tsAcc + ts > tLimit)
("N" :: flags, 0L)
else
("Y" :: flags, tsAcc + ts)
}._1.reverse
logins zip flags
}

UDF获取要处理的time-diff列表(当前行和前一行之间的秒数(。注意,UDF中foldLeft的累加器是(flags,tsAcc(的元组,其中:

  • flags是要返回的重复标志的列表
  • tsAcc用于将有条件累积的时间戳值带到下一次迭代

还要注意,login-date的列表只是"通过"的,以便包含在最终数据集中。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("user1", "12/1/19 8:00"),
("user1", "12/1/19 10:00"),
("user1", "12/1/19 23:00"),
("user1", "12/2/19 7:00"),
("user1", "12/2/19 8:00"),
("user1", "12/2/19 10:00"),
("user1", "12/3/19 9:00"),
("user1", "12/3/19 23:00"),
("user1", "12/4/19 7:00"),
("user2", "12/4/19 8:00"),
("user2", "12/5/19 5:00"),
("user2", "12/6/19 0:00")
).toDF("user", "login")

使用groupBy/collect_listtime-diff的列表和login-date的列表被馈送到UDF以生成所需的重复标志,然后使用explode:将其压平

val win1 = Window.partitionBy("user").orderBy("ts")
df.
withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).
withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).
groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).
withColumn("tuple", explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).
select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).
show
// +-----+-------------+---------+
// | user|        login|duplicate|
// +-----+-------------+---------+
// |user1| 12/1/19 8:00|        N|
// |user1|12/1/19 10:00|        Y|
// |user1|12/1/19 23:00|        Y|
// |user1| 12/2/19 7:00|        Y|
// |user1| 12/2/19 8:00|        Y|
// |user1|12/2/19 10:00|        N|
// |user1| 12/3/19 9:00|        Y|
// |user1|12/3/19 23:00|        N|
// |user1| 12/4/19 7:00|        Y|
// |user2| 12/4/19 8:00|        N|
// |user2| 12/5/19 5:00|        Y|
// |user2| 12/6/19 0:00|        N|
// +-----+-------------+---------+

Python:这是我的scala代码的转换。

from pyspark.sql.functions import col, lag, unix_timestamp, to_timestamp, lit, when, row_number, first
from pyspark.sql import Window
w = Window.partitionBy("user", "index").orderBy("login")
df2 = df.withColumn("login", to_timestamp(col("login"), "MM/dd/yy HH:mm"))
df2.join(df2.groupBy("user").agg(first("login").alias("firstLogin")), "user", "left") 
.withColumn("index", ((unix_timestamp(col("login")) - unix_timestamp(col("firstLogin"))) / 86400).cast("int")) 
.withColumn("Duplicate", when(row_number().over(w) == 1, lit("N")).otherwise(lit("Y"))) 
.orderBy("user", "login") 
.show(20)

Scala:您可以使用lag函数和我制作的时差索引列

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("user", "index").orderBy("login")
val df2 = df.withColumn("login", to_timestamp($"login", "MM/dd/yy HH:mm"))
df2.join(df2.groupBy("user").agg(first("login").as("firstLogin")), Seq("user"), "left")
.withColumn("index", ((unix_timestamp(col("login")) - unix_timestamp(col("firstLogin"))) / 86400).cast("int"))
.withColumn("Duplicate", when(row_number.over(w) === 1, lit("N")).otherwise(lit("Y")))
.orderBy("user", "login")
.show

结果是:

+-----+-------------------+-------------------+-----+---------+
| user|              login|         firstLogin|index|Duplicate|
+-----+-------------------+-------------------+-----+---------+
|user1|2019-12-01 08:00:00|2019-12-01 08:00:00|    0|        N|
|user1|2019-12-01 10:00:00|2019-12-01 08:00:00|    0|        Y|
|user1|2019-12-01 23:00:00|2019-12-01 08:00:00|    0|        Y|
|user1|2019-12-02 07:00:00|2019-12-01 08:00:00|    0|        Y|
|user1|2019-12-02 08:00:00|2019-12-01 08:00:00|    1|        N|
|user1|2019-12-02 10:00:00|2019-12-01 08:00:00|    1|        Y|
|user1|2019-12-03 23:00:00|2019-12-01 08:00:00|    2|        N|
|user1|2019-12-04 07:00:00|2019-12-01 08:00:00|    2|        Y|
|user2|2019-12-04 08:00:00|2019-12-04 08:00:00|    0|        N|
|user2|2019-12-05 05:00:00|2019-12-04 08:00:00|    0|        Y|
|user2|2019-12-06 00:00:00|2019-12-04 08:00:00|    1|        N|
+-----+-------------------+-------------------+-----+---------+

最新更新