PySpark中的窗口函数和条件过滤器



有没有一种方法可以有条件地将filter应用于pyspark中的窗口函数?对于col1中的每个组,我希望只保留在col2中具有X的行。如果一个组在col2中没有X,我希望保留该组中的所有行。

+------+------+
| col1 | col2 |
+------+------+
| A    |      |
+------+------+
| A    | X    |
+------+------+
| A    |      |
+------+------+
| B    |      |
+------+------+
| B    |      |
+------+------+
| B    |      |
+------+------+

您可以使用max窗口函数来表示组(由col1分区(,该组在col2中具有标识符(本例中为1(的"X"。没有"X"的组将被分配null。之后,只需过滤中间数据帧即可获得所需的结果。

from pyspark.sql import Window
from pyspark.sql.functions import max,when
w = Window.partitionBy(df.col1)
df_1 = df.withColumn('x_exists',max(when(df.col2 == 'X',1)).over(w))
df_2 = df_1.filter(((df_1.x_exists == 1) & (df_1.col2 == 'X')) | df_1.x_exists.isNull())
df_2.show()

使用collect_list和更多SQL语法的替代方案:collect_list跳过NULL值,我们使用if(col2='X',1,NULL)作为列表项,这样当col2中没有显示"X"时,该collect_list的大小为ZERO:

from pyspark.sql.functions import expr                                                                              
df_new = df.withColumn('has_X', expr("size(collect_list(if(col2='X',1,NULL)) OVER (partition by col1))>0")) 
.filter("col2 = 'X' OR !has_X")

最新更新