PySpark中有一个DataFrame,数据如下:
原始数据:
Shop Customer date retrive_days
A C1 15/06/2019 2
A C1 16/06/2019 0
A C1 17/06/2019 0
A C1 18/06/2019 0
B C2 20/07/2019 5
B C2 21/07/2019 0
B C2 23/07/2019 0
B C2 30/07/2019 0
B C2 01/08/2019 6
B C2 02/08/2019 0
B C2 03/08/2019 0
B C2 09/08/2019 0
B C2 10/08/2019 1
B C2 11/08/2019 0
B C2 13/08/2019 0
每个客户都有一个他/她访问商店的日期,每个客户也有retive_day,并且必须将天数数据提取到输出中。
我正试图在PySpark中获得一个输出,它应该是这样的,根据每个客户的retive_days值进行过滤
预期输出:
Shop Customer date retrive_days
A C1 15/06/2019 2
A C1 16/06/2019 0
B C2 20/07/2019 5
B C2 21/07/2019 0
B C2 23/07/2019 0
B C2 01/08/2019 6
B C2 02/08/2019 0
B C2 03/08/2019 0
B C2 10/08/2019 1
B C2 11/08/2019 0
尝试使用window functions
。
在示例输出中,最后一行应该被省略,因为对于其他2,5,6逻辑,日期不应该等于最大日期(retive_days+date(。如果不是这样,请执行filter('date1<=max_date')
。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("Shop","Customer").orderBy("date1")
w2=Window().partitionBy("Shop","Customer","partitions")
df.withColumn("date1", F.to_date("date", "dd/MM/yyyy"))
.withColumn("partitions", F.sum(F.expr("""IF(retrive_days!=0, 1, 0)""")).over(w1))
.withColumn("max_date", F.max(F.expr("""IF(retrive_days!=0,date_add(date1,retrive_days),null)""")).over(w2))
.filter('date1<max_date').drop("date1","max_date","partitions").show()
#+----+--------+----------+------------+
#|Shop|Customer| date|retrive_days|
#+----+--------+----------+------------+
#| A| C1|15/06/2019| 2|
#| A| C1|16/06/2019| 0|
#| B| C2|20/07/2019| 5|
#| B| C2|21/07/2019| 0|
#| B| C2|23/07/2019| 0|
#| B| C2|01/08/2019| 6|
#| B| C2|02/08/2019| 0|
#| B| C2|03/08/2019| 0|
#| B| C2|10/08/2019| 1|
#+----+--------+----------+------------+