根据pyspark中其他列的日期范围检索每个分区的数据



PySpark中有一个DataFrame,数据如下:

原始数据:

Shop Customer date        retrive_days
A    C1       15/06/2019  2
A    C1       16/06/2019  0
A    C1       17/06/2019  0
A    C1       18/06/2019  0
B    C2       20/07/2019  5
B    C2       21/07/2019  0
B    C2       23/07/2019  0
B    C2       30/07/2019  0
B    C2       01/08/2019  6
B    C2       02/08/2019  0
B    C2       03/08/2019  0
B    C2       09/08/2019  0
B    C2       10/08/2019  1
B    C2       11/08/2019  0
B    C2       13/08/2019  0

每个客户都有一个他/她访问商店的日期,每个客户也有retive_day,并且必须将天数数据提取到输出中。

我正试图在PySpark中获得一个输出,它应该是这样的,根据每个客户的retive_days值进行过滤

预期输出:

Shop Customer date        retrive_days
A    C1       15/06/2019  2
A    C1       16/06/2019  0
B    C2       20/07/2019  5
B    C2       21/07/2019  0
B    C2       23/07/2019  0
B    C2       01/08/2019  6
B    C2       02/08/2019  0
B    C2       03/08/2019  0
B    C2       10/08/2019  1
B    C2       11/08/2019  0

尝试使用window functions

在示例输出中,最后一行应该被省略,因为对于其他2,5,6逻辑,日期不应该等于最大日期(retive_days+date(。如果不是这样,请执行filter('date1<=max_date')

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("Shop","Customer").orderBy("date1")
w2=Window().partitionBy("Shop","Customer","partitions")
df.withColumn("date1", F.to_date("date", "dd/MM/yyyy"))
.withColumn("partitions", F.sum(F.expr("""IF(retrive_days!=0, 1, 0)""")).over(w1))
.withColumn("max_date", F.max(F.expr("""IF(retrive_days!=0,date_add(date1,retrive_days),null)""")).over(w2))
.filter('date1<max_date').drop("date1","max_date","partitions").show()
#+----+--------+----------+------------+
#|Shop|Customer|      date|retrive_days|
#+----+--------+----------+------------+
#|   A|      C1|15/06/2019|           2|
#|   A|      C1|16/06/2019|           0|
#|   B|      C2|20/07/2019|           5|
#|   B|      C2|21/07/2019|           0|
#|   B|      C2|23/07/2019|           0|
#|   B|      C2|01/08/2019|           6|
#|   B|      C2|02/08/2019|           0|
#|   B|      C2|03/08/2019|           0|
#|   B|      C2|10/08/2019|           1|
#+----+--------+----------+------------+

最新更新