基于X天的向后观察创建一个数据帧



我有以下DF:

|-----------------|
|Date       | Cod |
|-----------------|
|2022-08-01 |  A  | 
|2022-08-02 |  A  |
|2022-08-03 |  A  |
|2022-08-04 |  A  |
|2022-08-05 |  A  |
|2022-08-01 |  B  |
|2022-08-02 |  B  |
|2022-08-03 |  B  |
|2022-08-04 |  B  |
|2022-08-05 |  B  |
|-----------------|

考虑到我有2天的向后观察,我如何生成以下输出DF

|------------------------------|
|RefDate    | Date       | Cod
|------------------------------|
|2022-08-03 |  2022-08-01 |  A |
|2022-08-03 |  2022-08-02 |  A |
|2022-08-03 |  2022-08-03 |  A |
|2022-08-04 |  2022-08-02 |  A |
|2022-08-04 |  2022-08-03 |  A |
|2022-08-04 |  2022-08-04 |  A |
|2022-08-05 |  2022-08-03 |  A |
|2022-08-05 |  2022-08-04 |  A |
|2022-08-05 |  2022-08-05 |  A |
|2022-08-03 |  2022-08-01 |  B |
|2022-08-03 |  2022-08-02 |  B |
|2022-08-03 |  2022-08-03 |  B |
|2022-08-04 |  2022-08-02 |  B |
|2022-08-04 |  2022-08-03 |  B |
|2022-08-04 |  2022-08-04 |  B |
|2022-08-05 |  2022-08-03 |  B |
|2022-08-05 |  2022-08-04 |  B |
|2022-08-05 |  2022-08-05 |  B |
|------------------------------|

我知道我可以使用循环来生成这个输出DF,但循环的性能不好,因为我无法在内存中缓存DF(我原来的DF大约有60亿行(。那么,获得这种输出的最佳方式是什么呢?

MVCE:

data_1=[
("2022-08-01","A"),
("2022-08-02","A"),
("2022-08-03","A"),
("2022-08-04","A"),
("2022-08-05","A"),
("2022-08-01","B"),
("2022-08-02","B"),
("2022-08-03","B"),
("2022-08-04","B"),
("2022-08-05","B")
]
schema_1 = StructType([
StructField("Date", StringType(),True),
StructField("Cod", StringType(),True)
])
df_1 = spark.createDataFrame(data=data_1,schema=schema_1)

您可以尝试自联接我的想法-如果您的集群和会话被优化配置,它应该可以使用6B行

data_sdf.alias('a'). 
join(data_sdf.alias('b'), 
[func.col('a.cod') == func.col('b.cod'), 
func.datediff(func.col('a.date'), func.col('b.date')).between(0, 2)],
'inner'
). 
drop(func.col('a.cod')). 
selectExpr('cod', 'a.date as ref_date', 'b.date as date'). 
show()
# +---+----------+----------+
# |cod|  ref_date|      date|
# +---+----------+----------+
# |  B|2022-08-01|2022-08-01|
# |  B|2022-08-02|2022-08-01|
# |  B|2022-08-02|2022-08-02|
# |  B|2022-08-03|2022-08-01|
# |  B|2022-08-03|2022-08-02|
# |  B|2022-08-03|2022-08-03|
# |  B|2022-08-04|2022-08-02|
# |  B|2022-08-04|2022-08-03|
# |  B|2022-08-04|2022-08-04|
# |  B|2022-08-05|2022-08-03|
# |  B|2022-08-05|2022-08-04|
# |  B|2022-08-05|2022-08-05|
# |  A|2022-08-01|2022-08-01|
# |  A|2022-08-02|2022-08-01|
# |  A|2022-08-02|2022-08-02|
# |  A|2022-08-03|2022-08-01|
# |  A|2022-08-03|2022-08-02|
# |  A|2022-08-03|2022-08-03|
# |  A|2022-08-04|2022-08-02|
# |  A|2022-08-04|2022-08-03|
# +---+----------+----------+
# only showing top 20 rows

这也将生成最初两个日期的记录,这些记录可以被丢弃。

最新更新