我有以下DF:
|-----------------|
|Date | Cod |
|-----------------|
|2022-08-01 | A |
|2022-08-02 | A |
|2022-08-03 | A |
|2022-08-04 | A |
|2022-08-05 | A |
|2022-08-01 | B |
|2022-08-02 | B |
|2022-08-03 | B |
|2022-08-04 | B |
|2022-08-05 | B |
|-----------------|
考虑到我有2天的向后观察,我如何生成以下输出DF
|------------------------------|
|RefDate | Date | Cod
|------------------------------|
|2022-08-03 | 2022-08-01 | A |
|2022-08-03 | 2022-08-02 | A |
|2022-08-03 | 2022-08-03 | A |
|2022-08-04 | 2022-08-02 | A |
|2022-08-04 | 2022-08-03 | A |
|2022-08-04 | 2022-08-04 | A |
|2022-08-05 | 2022-08-03 | A |
|2022-08-05 | 2022-08-04 | A |
|2022-08-05 | 2022-08-05 | A |
|2022-08-03 | 2022-08-01 | B |
|2022-08-03 | 2022-08-02 | B |
|2022-08-03 | 2022-08-03 | B |
|2022-08-04 | 2022-08-02 | B |
|2022-08-04 | 2022-08-03 | B |
|2022-08-04 | 2022-08-04 | B |
|2022-08-05 | 2022-08-03 | B |
|2022-08-05 | 2022-08-04 | B |
|2022-08-05 | 2022-08-05 | B |
|------------------------------|
我知道我可以使用循环来生成这个输出DF,但循环的性能不好,因为我无法在内存中缓存DF(我原来的DF大约有60亿行(。那么,获得这种输出的最佳方式是什么呢?
MVCE:
data_1=[
("2022-08-01","A"),
("2022-08-02","A"),
("2022-08-03","A"),
("2022-08-04","A"),
("2022-08-05","A"),
("2022-08-01","B"),
("2022-08-02","B"),
("2022-08-03","B"),
("2022-08-04","B"),
("2022-08-05","B")
]
schema_1 = StructType([
StructField("Date", StringType(),True),
StructField("Cod", StringType(),True)
])
df_1 = spark.createDataFrame(data=data_1,schema=schema_1)
您可以尝试自联接我的想法-如果您的集群和会话被优化配置,它应该可以使用6B行
data_sdf.alias('a').
join(data_sdf.alias('b'),
[func.col('a.cod') == func.col('b.cod'),
func.datediff(func.col('a.date'), func.col('b.date')).between(0, 2)],
'inner'
).
drop(func.col('a.cod')).
selectExpr('cod', 'a.date as ref_date', 'b.date as date').
show()
# +---+----------+----------+
# |cod| ref_date| date|
# +---+----------+----------+
# | B|2022-08-01|2022-08-01|
# | B|2022-08-02|2022-08-01|
# | B|2022-08-02|2022-08-02|
# | B|2022-08-03|2022-08-01|
# | B|2022-08-03|2022-08-02|
# | B|2022-08-03|2022-08-03|
# | B|2022-08-04|2022-08-02|
# | B|2022-08-04|2022-08-03|
# | B|2022-08-04|2022-08-04|
# | B|2022-08-05|2022-08-03|
# | B|2022-08-05|2022-08-04|
# | B|2022-08-05|2022-08-05|
# | A|2022-08-01|2022-08-01|
# | A|2022-08-02|2022-08-01|
# | A|2022-08-02|2022-08-02|
# | A|2022-08-03|2022-08-01|
# | A|2022-08-03|2022-08-02|
# | A|2022-08-03|2022-08-03|
# | A|2022-08-04|2022-08-02|
# | A|2022-08-04|2022-08-03|
# +---+----------+----------+
# only showing top 20 rows
这也将生成最初两个日期的记录,这些记录可以被丢弃。