我有一个包含用户和登录时间的数据集。如果在首次登录后的24小时内有/其他登录,我需要标记重复。"活动"窗口打开,用户登录。例如,这里是样本数据集
用户登录-----------------------------用户1 12/1/19 8:00用户1 12/1/19 10:00用户1 12/1/19 23:00用户1 12/2/19 7:00用户1 12/2/19 8:00用户1 12/2/19 10:00用户1 12/3/19 23:00用户1 12/4/19 7:00用户2 12/4/19 8:00用户2 12/5/19 5:00user2 12/6/19 0:00
预期结果
用户登录重复---------------------------------user1 12/1/19 8:00 N这是user1的第一次登录-24小时窗口在此打开user1 12/1/19 10:00 Y,因为这是在24小时内用户1 12/1/19 23:00 Y用户1 12/2/19 7:00 Y用户1 12/2/19 8:00 Yuser1 12/2/19 10:00 N此活动时间大于(上一个窗口打开+24小时(。上一个窗口关闭,第二个窗口在此打开用户1 12/3/19 23:00 N用户1 12/4/19 7:00 Y用户2 12/4/19 8:00 N用户2 12/5/19 5:00 Yuser2 12/6/19 0:00 N
我查看了具有复杂条件的Spark SQL窗口函数,但如果用户登录间隔固定(比如说每18小时一次(,则此解决方案将不起作用。
这里是另一个例子(如果解决方案只考虑计算24小时窗口的第一个活动,它将为下面的记录#7给出不正确的结果(不重复((
user1 12/1/19 8:00 N这是user1的首次登录-24小时窗口在此打开user1 12/1/19 10:00 Y,因为这是在24小时内用户1 12/1/19 23:00 Y用户1 12/2/19 7:00 Y用户1 12/2/19 8:00 Yuser1 12/2/19 10:00 N此活动时间大于(上一个窗口打开+24小时(。上一个窗口关闭,第二个窗口在此打开**用户1 12/3/19 09:00 N**用户1 12/3/19 23:00 N用户1 12/4/19 7:00 Y用户2 12/4/19 8:00 N用户2 12/5/19 5:00 Yuser2 12/6/19 0:00 N
我不知道有任何内置的Spark函数可以根据上一个会话以动态方式结束的位置连续识别下一个24小时会话(或任何给定时间段(的开始。处理这种需求的一种方法是通过一个UDF来利用Scala的fold
函数:
def dupeFlags(tLimit: Long) = udf{ (logins: Seq[String], tsDiffs: Seq[Long]) =>
val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>
if (ts == 0 || tsAcc + ts > tLimit)
("N" :: flags, 0L)
else
("Y" :: flags, tsAcc + ts)
}._1.reverse
logins zip flags
}
UDF获取要处理的time-diff
列表(当前行和前一行之间的秒数(。注意,UDF中foldLeft
的累加器是(flags,tsAcc(的元组,其中:
flags
是要返回的重复标志的列表tsAcc
用于将有条件累积的时间戳值带到下一次迭代
还要注意,login-date
的列表只是"通过"的,以便包含在最终数据集中。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("user1", "12/1/19 8:00"),
("user1", "12/1/19 10:00"),
("user1", "12/1/19 23:00"),
("user1", "12/2/19 7:00"),
("user1", "12/2/19 8:00"),
("user1", "12/2/19 10:00"),
("user1", "12/3/19 9:00"),
("user1", "12/3/19 23:00"),
("user1", "12/4/19 7:00"),
("user2", "12/4/19 8:00"),
("user2", "12/5/19 5:00"),
("user2", "12/6/19 0:00")
).toDF("user", "login")
使用groupBy/collect_list
,time-diff
的列表和login-date
的列表被馈送到UDF以生成所需的重复标志,然后使用explode
:将其压平
val win1 = Window.partitionBy("user").orderBy("ts")
df.
withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).
withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).
groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).
withColumn("tuple", explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).
select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).
show
// +-----+-------------+---------+
// | user| login|duplicate|
// +-----+-------------+---------+
// |user1| 12/1/19 8:00| N|
// |user1|12/1/19 10:00| Y|
// |user1|12/1/19 23:00| Y|
// |user1| 12/2/19 7:00| Y|
// |user1| 12/2/19 8:00| Y|
// |user1|12/2/19 10:00| N|
// |user1| 12/3/19 9:00| Y|
// |user1|12/3/19 23:00| N|
// |user1| 12/4/19 7:00| Y|
// |user2| 12/4/19 8:00| N|
// |user2| 12/5/19 5:00| Y|
// |user2| 12/6/19 0:00| N|
// +-----+-------------+---------+
Python:这是我的scala代码的转换。
from pyspark.sql.functions import col, lag, unix_timestamp, to_timestamp, lit, when, row_number, first
from pyspark.sql import Window
w = Window.partitionBy("user", "index").orderBy("login")
df2 = df.withColumn("login", to_timestamp(col("login"), "MM/dd/yy HH:mm"))
df2.join(df2.groupBy("user").agg(first("login").alias("firstLogin")), "user", "left")
.withColumn("index", ((unix_timestamp(col("login")) - unix_timestamp(col("firstLogin"))) / 86400).cast("int"))
.withColumn("Duplicate", when(row_number().over(w) == 1, lit("N")).otherwise(lit("Y")))
.orderBy("user", "login")
.show(20)
Scala:您可以使用lag
函数和我制作的时差索引列
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("user", "index").orderBy("login")
val df2 = df.withColumn("login", to_timestamp($"login", "MM/dd/yy HH:mm"))
df2.join(df2.groupBy("user").agg(first("login").as("firstLogin")), Seq("user"), "left")
.withColumn("index", ((unix_timestamp(col("login")) - unix_timestamp(col("firstLogin"))) / 86400).cast("int"))
.withColumn("Duplicate", when(row_number.over(w) === 1, lit("N")).otherwise(lit("Y")))
.orderBy("user", "login")
.show
结果是:
+-----+-------------------+-------------------+-----+---------+
| user| login| firstLogin|index|Duplicate|
+-----+-------------------+-------------------+-----+---------+
|user1|2019-12-01 08:00:00|2019-12-01 08:00:00| 0| N|
|user1|2019-12-01 10:00:00|2019-12-01 08:00:00| 0| Y|
|user1|2019-12-01 23:00:00|2019-12-01 08:00:00| 0| Y|
|user1|2019-12-02 07:00:00|2019-12-01 08:00:00| 0| Y|
|user1|2019-12-02 08:00:00|2019-12-01 08:00:00| 1| N|
|user1|2019-12-02 10:00:00|2019-12-01 08:00:00| 1| Y|
|user1|2019-12-03 23:00:00|2019-12-01 08:00:00| 2| N|
|user1|2019-12-04 07:00:00|2019-12-01 08:00:00| 2| Y|
|user2|2019-12-04 08:00:00|2019-12-04 08:00:00| 0| N|
|user2|2019-12-05 05:00:00|2019-12-04 08:00:00| 0| Y|
|user2|2019-12-06 00:00:00|2019-12-04 08:00:00| 1| N|
+-----+-------------------+-------------------+-----+---------+