我正在使用PySpark处理日志。我有两个数据框,一个:logs
DF存储搜索queries
,另一个:clicks
DF存储点击文档IDs
。
结构如下:
+-------------------+-------+----------------------+
|timestamp |user |query |
+-------------------+-------+----------------------+
|2021-12-01 06:14:38|m96cles|minoration |
|2021-12-01 06:32:54|m96ngro|associés |
|2021-12-01 06:40:40|m96mbeg|cessation |
|2021-12-01 07:02:42|m96ngro|membres de société |
|2021-12-01 07:02:58|m96ngro|cumul |
|2021-12-01 07:07:30|m96rara|cessation |
|2021-12-01 07:09:37|m64nesc|INVF |
|2021-12-01 07:16:14|m83ccat|report didentifiation |
+-------------------+-------+----------------------+
+-------------------+-------+------+
|timestamp |user |doc_id|
+-------------------+-------+------+
|2021-12-01 06:14:42|m96cles|783 |
|2021-12-01 06:33:38|m96ngro|6057 |
|2021-12-01 06:40:52|m96mbeg|1407 |
|2021-12-01 06:49:12|m96mbeg|1414 |
|2021-12-01 06:53:19|m51cstr|15131 |
|2021-12-01 06:53:35|m51cstr|14992 |
|2021-12-01 06:53:55|m51cstr|15093 |
|2021-12-01 06:54:20|m51cstr|15110 |
+-------------------+-------+------+
我合并了两个数据帧做df = logs.unionByName(clicks, allowMissingColumns=True)
,并按timestamp
排序。
+-------------------+--------+--------------------+------+
| timestamp| user| query|doc_id|
+-------------------+--------+--------------------+------+
|2022-05-31 20:23:40|ozenfada| null| 7931|
|2022-05-31 21:06:44| m97emou| apnée du sommeil| null|
|2022-05-31 21:28:24| m64lbeh| null| 192|
|2022-05-31 21:29:04| m97emou| null| 3492|
+-------------------+--------+--------------------+------+
我们的想法是只保留搜索queries
导致点击的行。我不想保留导致文档没有点击的日志。为了实现这一目标,我试图用相同的user
查看下一行,看看他们是否在5分钟内至少点击了一个文档。最后,我只想保留搜索query
值的行。
这是我目前所做的,我尝试创建一个布尔列:
df = df.withColumn('valid',
df.user == F.lead('user').over(
W.partitionBy(clicks.user, F.window('timestamp', '5 minutes')).orderBy('timestamp')
)
)
这是期望的输出。注意,只有包含搜索查询的行才会导致点击(搜索查询行(query != null
)后面跟着"clicks
";行(doc_id != null
)具有相同的用户名)具有true
标志。还有,query
"rech"导致查询更正"recherche"因此不应该标记为true
。
+-------------------+--------+----------------------------------------+------+-----+
|timestamp |user |query |doc_id|valid|
+-------------------+--------+----------------------------------------+------+-----+
|2022-05-31 18:56:47|m97bcar |exemple |null |false|
|2022-05-31 19:22:40|ozenfada|fort |null |true |
|2022-05-31 19:23:40|ozenfada|null |7931 |false|
|2022-05-31 19:24:09|ozenfada|null |1034 |false|
|2022-05-31 21:06:44|m97emou |apnée du sommeil |null |true |
|2022-05-31 21:07:24|m64lbeh |rech |192 |false|
|2022-05-31 21:07:40|m64lbeh |recherche |null |true |
|2022-05-31 21:08:21|m64lbeh |null |3002 |false|
|2022-05-31 21:11:04|m97emou |null |3492 |false|
+-------------------+--------+----------------------------------------+------+-----+
任何帮助都将是非常感激的。
以下将在5分钟内获得所有导致点击的查询。user
上的Join将在查询和点击之间的timestamp
差异为<=5
分钟的额外条件下工作。下面打印的结果是所提供的示例数据。
# rename columns to avoid ambiguity
logs = logs.withColumnRenamed('timestamp', 'query_timestamp')
clicks = clicks.withColumnRenamed('timestamp', 'click_timestamp')
clicks = clicks.withColumnRenamed('user', 'click_user')
# join on same username, and if the click is within 5 minutes of the query
time_diff_in_seconds = F.unix_timestamp(clicks['click_timestamp']) - F.unix_timestamp(logs['query_timestamp'])
join_cond = (logs['user']==clicks['click_user']) &
(time_diff_in_seconds >= 0) &
(time_diff_in_seconds <= 5*60)
df2 = logs.join(clicks, join_cond, how='left')
# drop all queries that didn't lead to a click
df2 = df2.filter(df2['doc_id'].isNotNull())
# select only the necessary columns
df2 = df2.select('query_timestamp', 'user', 'query').distinct()
df2.show()
+-------------------+-------+----------+
| query_timestamp| user| query|
+-------------------+-------+----------+
|2021-12-01 06:14:38|m96cles|minoration|
|2021-12-01 06:40:40|m96mbeg| cessation|
|2021-12-01 06:32:54|m96ngro| associés|
+-------------------+-------+----------+
Update -处理拼写错误查询
引入一个列,以秒为单位显示查询和单击之间的时间差。在连接之后,这两行都将被保留,但是对于拼写错误的查询,时间差会更大。因此,对时差执行orderBy()
并删除第二行。这可以用dropDuplicates('click_timestamp', 'user', 'doc_id')
来完成。
假设在第二行搜索minor
:
+-------------------+-------+--------------------+
| timestamp| user| query|
+-------------------+-------+--------------------+
|2021-12-01 06:14:38|m96cles| minoration|
|2021-12-01 06:14:39|m96cles| minor|
... and rest of the rows
logs = logs.withColumnRenamed('timestamp', 'query_timestamp')
clicks = clicks.withColumnRenamed('timestamp', 'click_timestamp')
clicks = clicks.withColumnRenamed('user', 'click_user')
time_diff_in_seconds = F.unix_timestamp(clicks['click_timestamp']) - F.unix_timestamp(logs['query_timestamp'])
join_cond = (logs['user']==clicks['click_user']) &
(time_diff_in_seconds >= 0) &
(time_diff_in_seconds <= 5*60)
df2 = logs.join(clicks, join_cond, how='left')
df2 = df2.withColumn('time_diff_in_seconds', time_diff_in_seconds)
# ensures if a query leads to multiple clicks then duplicates caused due to left join are dropped
df2 = df2.orderBy('time_diff_in_seconds').dropDuplicates(['query_timestamp', 'user', 'query'])
# keep only the latest query that lead to a click
df2 = df2.orderBy('time_diff_in_seconds').dropDuplicates(['click_timestamp', 'user', 'doc_id'])
df2 = df2.filter(df2['doc_id'].isNotNull())
df2 = df2.select('query_timestamp', 'user', 'query')
df2.show()
+-------------------+-------+---------+
| query_timestamp| user| query|
+-------------------+-------+---------+
|2021-12-01 06:14:39|m96cles| minor|
|2021-12-01 06:32:54|m96ngro| associés|
|2021-12-01 06:40:40|m96mbeg|cessation|
+-------------------+-------+---------+
您可能需要对更复杂的场景进行测试,并且可能需要稍微修改代码。根据样本数据,我认为这个逻辑是可行的。