Pyspark:使用窗口函数过滤行



我有下面的数据帧和数据

Id        data          time
9999      RANGE= 0  14-02-2022 03:47
9999      RANGE= 0  14-02-2022 03:51
9999      RANGE= 1  14-02-2022 03:59
9999      RANGE= 1  14-02-2022 04:01
9999      RANGE= 0  14-02-2022 13:04
9999      RANGE= 0  14-02-2022 13:06
9999      RANGE= 0  14-02-2022 13:10
9999      RANGE= 1  14-02-2022 13:14
9999      RANGE= 1  14-02-2022 13:16
9999      RANGE= 1  14-02-2022 13:18
9999      RANGE= 0  14-02-2022 13:19
9999      RANGE= 0  14-02-2022 13:20
9999      RANGE= 0  14-02-2022 13:23

首先检查RANGE=1,然后忽略RANGE=1的所有记录,直到找到RANGE=0的记录,然后再次查找RANGE=1记录,同时忽略RANGE=0记录,依此类推。

最终数据帧看起来像::

Id        data          time
9999      RANGE= 1  14-02-2022 03:59
9999      RANGE= 0  14-02-2022 13:04
9999      RANGE= 1  14-02-2022 13:14
9999      RANGE= 0  14-02-2022 13:19

如何使用pyspark窗口函数来实现它。

from pyspark.sql.window import Window
from pyspark.sql import functions as F

data = [('9999','RANGE= 1','14-02-2022 03:40'),
('9999','RANGE= 0','14-02-2022 03:47'),('9999','RANGE= 0','14-02-2022 03:51'),('9999','RANGE= 1','14-02-2022 03:59'),
('9999','RANGE= 1','14-02-2022 04:01'),('9999','RANGE= 0','14-02-2022 13:04'),('9999','RANGE= 0','14-02-2022 13:06'),
('9999','RANGE= 0','14-02-2022 13:10'),('9999','RANGE= 1','14-02-2022 13:14'),('9999','RANGE= 1','14-02-2022 13:16'),
('9999','RANGE= 1','14-02-2022 13:18'),('9999','RANGE= 0','14-02-2022 13:19'),('9999','RANGE= 0','14-02-2022 13:20'),
('9999','RANGE= 0','14-02-2022 13:23')]

cols = ["Id","data","time"]

df = spark.createDataFrame(data = data, schema = cols)

w = Window.partitionBy().orderBy("time")
df = spark.createDataFrame(data = data, schema = cols)
df = df.withColumn("rown", F.row_number().over(w))
df2 = df.filter(df.rown == 1)
df2 = df2.filter(df2.data == 'RANGE= 1').drop("rown","next_time","next_data")
df = df.withColumn("next_time", F.lead(df.time).over(my_window))
df = df.withColumn("next_data", F.lead(df.data).over(w))
df = df.filter(df.next_data != df.data).drop("rown","data","time")

然后我们把df2和df联合起来。

使用a window function的一种方法

from pyspark.sql import functions as F
from pyspark.sql.window import Window
my_window = Window.orderBy("time")
df = df.withColumn("prev", F.lag(df.data).over(my_window))
df = df.withColumn("diff", F.when(df.data == df.prev, 0)
.otherwise(1))
df = df.filter(df.diff ==1).drop("prev","diff")

最新更新