根据条件更改窗口 pyspark 数据帧中的所有行值



我有一个pyspark数据帧,它有三列id,seq seq_checker。我需要按 id 排序并检查seq_checker列中的 4 个连续 1。

我尝试使用窗口函数。我无法根据条件更改窗口中的所有值。


new_window = Window.partitionBy().orderBy("id").rangeBetween(0, 3)output = df.withColumn('check_sequence',F.when(F.min(df['seq_checker']).over(new_window) == 1, True))

原始 PySpark DF:

+---+---+-----------+--------------+ |Id|seq|seq_checker|check_sequence| +---+---+-----------+--------------+ | 1| 1|         1|        假| | 2| 2|         1|        假| | 3| 3|         1|        假| | 4| 4|         1|        假| | 5|10|         0|        假| | 6|14|         1|        假| | 7|13|         1|        假| | 8|18|         0|        假| | 9|23|         0|        假| |10| 5|         0|        假| |11|56|         0|        假| |12|66|         0|        假| |13|34|         1|        假| |14|35|         1|        假| |15|36|         1|        假| |16|37|         1|        假| |17|39|         0|        假| |18|54|         0|        假| |19|68|         0|        假| |20|90|         0|        假| +---+---+-----------+--------------+

所需输出:

+---+---+-----------+--------------+ |Id|seq|seq_checker|check_sequence| +---+---+-----------+--------------+ | 1| 1|         1|         真| | 2| 2|         1|         真| | 3| 3|         1|         真| | 4| 4|         1|         真| | 5|10|         0|        假| | 6|14|         1|        假| | 7|13|         1|        假| | 8|18|         0|        假| | 9|23|         0|        假| |10| 5|         0|        假| |11|56|         0|        假| |12|66|         0|        假| |13|34|         1|         真| |14|35|         1|         真| |15|36|         1|         真| |16|37|         1|         真| |17|39|         0|        假| |18|54|         0|        假| |19|68|         0|        假| |20|90|         0|        假| +---+---+-----------+--------------+

基于上面的代码,我的输出是:

+---+---+-----------+--------------+ |Id|seq|seq_checker|check_sequence| +---+---+-----------+--------------+ | 1| 1|         1|         真| | 2| 2|         1|         空| | 3| 3|         1|         空| | 4| 4|         1|         空| | 5|10|         0|         空| | 6|14|         1|         空| | 7|13|         1|         空| | 8|18|         0|         空| | 9|23|         0|         空| |10| 5|         0|         空| |11|56|         0|         空| |12|66|         0|         空| |13|34|         1|         真| |14|35|         1|         空| |15|36|         1|         空| |16|37|         1|         空| |17|39|         0|         空| |18|54|         0|         空| |19|68|         0|         空| |20|90|         0|         空| +---+---+-----------+--------------+

编辑: 1.如果我们有超过4个连续的行具有1,我们需要将所有行check_sequence标志更改为True。

  1. 我的实际问题是检查"seq"列中长度大于 4 的序列。我能够使用超前和滞后函数创建seq_checker列。

最初定义一个只有id排序的窗口。然后使用行号差异方法(具有不同的顺序)将连续的 1(也分组连续的相同值)与相同的组号分组。分组完成后,只需检查组的maxmin是否为 1,并且组中至少有 4 个 1,即可获得所需的boolean输出。

from pyspark.sql.functions import row_number,count,when,min,max
w1 = Window.orderBy(df.id)
w2 = Window.orderBy(df.seq_checker,df.id)
groups = df.withColumn('grp',row_number().over(w1)-row_number().over(w2))
w3 = Window.partitionBy(groups.grp)
output = groups.withColumn('check_seq',(max(groups.seq_checker).over(w3)==1) & (min(groups.seq_checker).over(w3)==1) & (count(groups.id).over(w3) >= 4)
output.show()

rangeBetween 允许您访问相对于当前行的行。您为 0,3 定义了一个窗口,该窗口允许您访问当前行和随后的三行,但这只会为 1 的 4 个相交行中的前 1 个设置正确的值。4 个 1 的连续行的第二个元素需要与前一行和后面的两行 (-1,2) 相同。1 的 4 个连续行的第三个元素需要与前两行和之后的两行 (-2,1) 相提并论。最后,4 个 1 的正行的第四个元素需要与前三行 (-3,0) 相提并论。

import pyspark.sql.functions as F
from pyspark.sql import Window
l = [
(  1,  1,          1),
(  2,  2,          1),
(  3,  3,          1),
(  4,  4,          1),
(  5, 10,          0),
(  6, 14,          1),
(  7, 13,          1),
(  8, 18,          0),
(  9, 23,          0),
( 10,  5,          0),
( 11, 56,          0),
( 12, 66,          0),
( 13, 34,          1),
( 14, 35,          1),
( 15, 36,          1),
( 16, 37,          1),
( 17, 39,          0),
( 18, 54,          0),
( 19, 68,          0),
( 20, 90,          0)
]
columns = ['Id','seq','seq_checker']
df=spark.createDataFrame(l, columns)
w1 = Window.partitionBy().orderBy("id").rangeBetween(0, 3)
w2 = Window.partitionBy().orderBy("id").rangeBetween(-1, 2)
w3 = Window.partitionBy().orderBy("id").rangeBetween(-2, 1)
w4 = Window.partitionBy().orderBy("id").rangeBetween(-3, 0)
output = df.withColumn('check_sequence',F.when(
(F.min(df['seq_checker']).over(w1) == 1) |
(F.min(df['seq_checker']).over(w2) == 1) |
(F.min(df['seq_checker']).over(w3) == 1) |
(F.min(df['seq_checker']).over(w4) == 1) 
, True).otherwise(False))
output.show()

输出:

+---+---+-----------+--------------+ 
| Id|seq|seq_checker|check_sequence| 
+---+---+-----------+--------------+ 
|  1|  1|          1|          true| 
|  2|  2|          1|          true| 
|  3|  3|          1|          true| 
|  4|  4|          1|          true|          
|  5| 10|          0|          null| 
|  6| 14|          1|          null| 
|  7| 13|          1|          null| 
|  8| 18|          0|          null| 
|  9| 23|          0|          null| 
| 10|  5|          0|          null| 
| 11| 56|          0|          null| 
| 12| 66|          0|          null| 
| 13| 34|          1|          true| 
| 14| 35|          1|          true| 
| 15| 36|          1|          true| 
| 16| 37|          1|          true| 
| 17| 39|          0|          null| 
| 18| 54|          0|          null| 
| 19| 68|          0|          null| 
| 20| 90|          0|          null| 
+---+---+-----------+--------------+

最新更新