如何在pyspark中根据窗口和条件过滤行



我需要删除行,其中对于相同的id、p_id和key_id,缺少反馈,但我们确实存在一些反馈。

输入

id p_id key_id feedback
1  p1   k1     happy
1  p1   k1     sad
1  p1   k2     sad
1  p1   k2     
1  p2   k3  
2  p1   k3     sad

输出

id p_id key_id feedback
1  p1   k1     happy
1  p1   k1     sad
1  p1   k2     sad
1  p2   k3  
2  p1   k3     sad

我如何在pyspark中实现这一点?

我会创建一个名为min_length的新列,并根据该列和feedback列进行筛选:

import pyspark.sql.functions as F
import pyspark.sql.window.Window as W
df = df.withColumn('min_length', 
F.min(F.length(F.trim(F.col('feedback'))))
.over(W.partitionBy('id', 'p_id', 'key_id'))
)
cond = (F.col('min_length') != 0) & (F.length(F.trim(F.col('feedback'))) == 0)
df.filter(~cond)

修剪只是剥离feedback列中的任何空间

您可以为每个键([idp_idkey_id](添加一列(我们称之为num_feedbacks(,该列统计您在DataFrame中对该键的反馈数量。然后,您可以过滤DataFrame,只保留有反馈的行(反馈不为Null(或没有任何特定键的反馈的行。

以下是代码示例:

key = ['id', 'p_id', 'key_id']
num_feedbacks = df.filter(col('feedback')!="")
.groupby(key).agg(F.count('feedback').alias('num_feedbacks'))
df = df.join(num_feedbacks, on=key, how='left')
.filter((col('feedback')!="") | (col('num_feedbacks').isNull()))
.drop('num_feedbacks')

这给了你:

+---+----+------+--------+
| id|p_id|key_id|feedback|
+---+----+------+--------+
|  2|  p1|    k3|     sad|
|  1|  p1|    k1|     sad|
|  1|  p1|    k1|   happy|
|  1|  p1|    k2|     sad|
|  1|  p2|    k3|        |
+---+----+------+--------+

最新更新