我有下面的数据帧和数据
Id data time
9999 RANGE= 0 14-02-2022 03:47
9999 RANGE= 0 14-02-2022 03:51
9999 RANGE= 1 14-02-2022 03:59
9999 RANGE= 1 14-02-2022 04:01
9999 RANGE= 0 14-02-2022 13:04
9999 RANGE= 0 14-02-2022 13:06
9999 RANGE= 0 14-02-2022 13:10
9999 RANGE= 1 14-02-2022 13:14
9999 RANGE= 1 14-02-2022 13:16
9999 RANGE= 1 14-02-2022 13:18
9999 RANGE= 0 14-02-2022 13:19
9999 RANGE= 0 14-02-2022 13:20
9999 RANGE= 0 14-02-2022 13:23
首先检查RANGE=1,然后忽略RANGE=1的所有记录,直到找到RANGE=0的记录,然后再次查找RANGE=1记录,同时忽略RANGE=0记录,依此类推。
最终数据帧看起来像::
Id data time
9999 RANGE= 1 14-02-2022 03:59
9999 RANGE= 0 14-02-2022 13:04
9999 RANGE= 1 14-02-2022 13:14
9999 RANGE= 0 14-02-2022 13:19
如何使用pyspark窗口函数来实现它。
from pyspark.sql.window import Window
from pyspark.sql import functions as F
data = [('9999','RANGE= 1','14-02-2022 03:40'),
('9999','RANGE= 0','14-02-2022 03:47'),('9999','RANGE= 0','14-02-2022 03:51'),('9999','RANGE= 1','14-02-2022 03:59'),
('9999','RANGE= 1','14-02-2022 04:01'),('9999','RANGE= 0','14-02-2022 13:04'),('9999','RANGE= 0','14-02-2022 13:06'),
('9999','RANGE= 0','14-02-2022 13:10'),('9999','RANGE= 1','14-02-2022 13:14'),('9999','RANGE= 1','14-02-2022 13:16'),
('9999','RANGE= 1','14-02-2022 13:18'),('9999','RANGE= 0','14-02-2022 13:19'),('9999','RANGE= 0','14-02-2022 13:20'),
('9999','RANGE= 0','14-02-2022 13:23')]
cols = ["Id","data","time"]
df = spark.createDataFrame(data = data, schema = cols)
w = Window.partitionBy().orderBy("time")
df = spark.createDataFrame(data = data, schema = cols)
df = df.withColumn("rown", F.row_number().over(w))
df2 = df.filter(df.rown == 1)
df2 = df2.filter(df2.data == 'RANGE= 1').drop("rown","next_time","next_data")
df = df.withColumn("next_time", F.lead(df.time).over(my_window))
df = df.withColumn("next_data", F.lead(df.data).over(w))
df = df.filter(df.next_data != df.data).drop("rown","data","time")
然后我们把df2和df联合起来。
使用a window function
的一种方法
from pyspark.sql import functions as F
from pyspark.sql.window import Window
my_window = Window.orderBy("time")
df = df.withColumn("prev", F.lag(df.data).over(my_window))
df = df.withColumn("diff", F.when(df.data == df.prev, 0)
.otherwise(1))
df = df.filter(df.diff ==1).drop("prev","diff")