在PySpark中只保留特定条件的行



我正在使用PySpark处理日志。我有两个数据框,一个:logsDF存储搜索queries,另一个:clicksDF存储点击文档IDs

结构如下:

+-------------------+-------+----------------------+
|timestamp          |user   |query                 |
+-------------------+-------+----------------------+
|2021-12-01 06:14:38|m96cles|minoration            |
|2021-12-01 06:32:54|m96ngro|associés              |
|2021-12-01 06:40:40|m96mbeg|cessation             |
|2021-12-01 07:02:42|m96ngro|membres de société    |
|2021-12-01 07:02:58|m96ngro|cumul                 |
|2021-12-01 07:07:30|m96rara|cessation             |
|2021-12-01 07:09:37|m64nesc|INVF                  |
|2021-12-01 07:16:14|m83ccat|report didentifiation |
+-------------------+-------+----------------------+
+-------------------+-------+------+
|timestamp          |user   |doc_id|
+-------------------+-------+------+
|2021-12-01 06:14:42|m96cles|783   |
|2021-12-01 06:33:38|m96ngro|6057  |
|2021-12-01 06:40:52|m96mbeg|1407  |
|2021-12-01 06:49:12|m96mbeg|1414  |
|2021-12-01 06:53:19|m51cstr|15131 |
|2021-12-01 06:53:35|m51cstr|14992 |
|2021-12-01 06:53:55|m51cstr|15093 |
|2021-12-01 06:54:20|m51cstr|15110 |
+-------------------+-------+------+

我合并了两个数据帧做df = logs.unionByName(clicks, allowMissingColumns=True),并按timestamp排序。

+-------------------+--------+--------------------+------+
|          timestamp|    user|               query|doc_id|
+-------------------+--------+--------------------+------+
|2022-05-31 20:23:40|ozenfada|                null|  7931|
|2022-05-31 21:06:44| m97emou|    apnée du sommeil|  null|
|2022-05-31 21:28:24| m64lbeh|                null|   192|
|2022-05-31 21:29:04| m97emou|                null|  3492|
+-------------------+--------+--------------------+------+

我们的想法是只保留搜索queries导致点击的行。我不想保留导致文档没有点击的日志。为了实现这一目标,我试图用相同的user查看下一行,看看他们是否在5分钟内至少点击了一个文档。最后,我只想保留搜索query值的行。

这是我目前所做的,我尝试创建一个布尔列:

df = df.withColumn('valid', 
df.user == F.lead('user').over(
W.partitionBy(clicks.user, F.window('timestamp', '5 minutes')).orderBy('timestamp')
)
)

这是期望的输出。注意,只有包含搜索查询的行才会导致点击(搜索查询行(query != null)后面跟着"clicks";行(doc_id != null)具有相同的用户名)具有true标志。还有,query"rech"导致查询更正"recherche"因此不应该标记为true

+-------------------+--------+----------------------------------------+------+-----+
|timestamp          |user    |query                                   |doc_id|valid|
+-------------------+--------+----------------------------------------+------+-----+
|2022-05-31 18:56:47|m97bcar |exemple                                 |null |false|
|2022-05-31 19:22:40|ozenfada|fort                                    |null  |true |
|2022-05-31 19:23:40|ozenfada|null                                    |7931  |false|
|2022-05-31 19:24:09|ozenfada|null                                    |1034  |false|
|2022-05-31 21:06:44|m97emou |apnée du sommeil                        |null  |true |
|2022-05-31 21:07:24|m64lbeh |rech                                    |192   |false|
|2022-05-31 21:07:40|m64lbeh |recherche                               |null  |true |
|2022-05-31 21:08:21|m64lbeh |null                                    |3002  |false|
|2022-05-31 21:11:04|m97emou |null                                    |3492  |false|
+-------------------+--------+----------------------------------------+------+-----+

任何帮助都将是非常感激的。


以下将在5分钟内获得所有导致点击的查询。user上的Join将在查询和点击之间的timestamp差异为<=5分钟的额外条件下工作。下面打印的结果是所提供的示例数据。

# rename columns to avoid ambiguity
logs = logs.withColumnRenamed('timestamp', 'query_timestamp')
clicks = clicks.withColumnRenamed('timestamp', 'click_timestamp')
clicks = clicks.withColumnRenamed('user', 'click_user')
# join on same username, and if the click is within 5 minutes of the query
time_diff_in_seconds = F.unix_timestamp(clicks['click_timestamp']) - F.unix_timestamp(logs['query_timestamp'])
join_cond = (logs['user']==clicks['click_user']) & 
(time_diff_in_seconds >= 0) & 
(time_diff_in_seconds <= 5*60)
df2 = logs.join(clicks, join_cond, how='left')
# drop all queries that didn't lead to a click
df2 = df2.filter(df2['doc_id'].isNotNull())
# select only the necessary columns
df2 = df2.select('query_timestamp', 'user', 'query').distinct()
df2.show()
+-------------------+-------+----------+
|    query_timestamp|   user|     query|
+-------------------+-------+----------+
|2021-12-01 06:14:38|m96cles|minoration|
|2021-12-01 06:40:40|m96mbeg| cessation|
|2021-12-01 06:32:54|m96ngro|  associés|
+-------------------+-------+----------+

Update -处理拼写错误查询

引入一个列,以秒为单位显示查询和单击之间的时间差。在连接之后,这两行都将被保留,但是对于拼写错误的查询,时间差会更大。因此,对时差执行orderBy()并删除第二行。这可以用dropDuplicates('click_timestamp', 'user', 'doc_id')来完成。

假设在第二行搜索minor:

+-------------------+-------+--------------------+
|          timestamp|   user|               query|
+-------------------+-------+--------------------+
|2021-12-01 06:14:38|m96cles|          minoration|
|2021-12-01 06:14:39|m96cles|               minor|
... and rest of the rows
logs = logs.withColumnRenamed('timestamp', 'query_timestamp')
clicks = clicks.withColumnRenamed('timestamp', 'click_timestamp')
clicks = clicks.withColumnRenamed('user', 'click_user')
time_diff_in_seconds = F.unix_timestamp(clicks['click_timestamp']) - F.unix_timestamp(logs['query_timestamp'])
join_cond = (logs['user']==clicks['click_user']) & 
(time_diff_in_seconds >= 0) & 
(time_diff_in_seconds <= 5*60)
df2 = logs.join(clicks, join_cond, how='left')
df2 = df2.withColumn('time_diff_in_seconds', time_diff_in_seconds)
# ensures if a query leads to multiple clicks then duplicates caused due to left join are dropped
df2 = df2.orderBy('time_diff_in_seconds').dropDuplicates(['query_timestamp', 'user', 'query'])
# keep only the latest query that lead to a click
df2 = df2.orderBy('time_diff_in_seconds').dropDuplicates(['click_timestamp', 'user', 'doc_id'])
df2 = df2.filter(df2['doc_id'].isNotNull())
df2 = df2.select('query_timestamp', 'user', 'query')
df2.show()
+-------------------+-------+---------+
|    query_timestamp|   user|    query|
+-------------------+-------+---------+
|2021-12-01 06:14:39|m96cles|    minor|
|2021-12-01 06:32:54|m96ngro| associés|
|2021-12-01 06:40:40|m96mbeg|cessation|
+-------------------+-------+---------+

您可能需要对更复杂的场景进行测试,并且可能需要稍微修改代码。根据样本数据,我认为这个逻辑是可行的。

相关内容

  • 没有找到相关文章

最新更新