我试图在pyspark
中使用熊猫的resample
技术,但没有得出任何结论。
+----------------------------+----+
| date | val|
+-----+---------------------------+
|2022-03-19T00:00:00.000+0000|14.0|
|2022-03-16T00:00:00.000+0000| 9.5|
|2022-03-13T00:00:00.000+0000|14.0|
|2022-03-10T00:00:00.000+0000| 1.0|
|2022-03-08T00:00:00.000+0000|24.0|
+-----+-------------------+----+--+
我有一个类似上面的数据集。然而,我想在pyspark中重新采样一个数据集,类似这样:
+----------------------------+----+
| date | val|
+-----+---------------------------+
|2022-03-19T00:00:00.000+0000|14.0|
|2022-03-18T00:00:00.000+0000|14.0|
|2022-03-17T00:00:00.000+0000|14.0|
|2022-03-16T00:00:00.000+0000| 9.5|
|2022-03-15T00:00:00.000+0000| 9.5|
|2022-03-14T00:00:00.000+0000| 9.5|
|2022-03-13T00:00:00.000+0000|14.0|
|2022-03-12T00:00:00.000+0000|14.0|
|2022-03-11T00:00:00.000+0000|14.0|
|2022-03-10T00:00:00.000+0000| 1.0|
|2022-03-09T00:00:00.000+0000| 1.0|
|2022-03-08T00:00:00.000+0000|24.0|
+-----+-------------------+----+--+
目标是按顺序排列日期,并用缺失的日期填充。
- 新的
val
列值应使用已存在的前一列的值填充
您可以首先获得每行可用的前一天,然后添加1天以获得前一天的下一天。
然后生成一个序列并爆炸:
from pyspark.sql import functions as F, Window as W
w = W.orderBy(F.desc("date")) #add .partitionBy(partitioncolumn)
out = df.withColumn("prev_",F.date_add(F.lead("date").over(w),1))
.withColumn("NewDate",
F.explode_outer(F.expr("sequence(date,prev_)"))
)
.withColumn("date",F.coalesce("NewDate","date")).select(*df.columns)
out.show(truncate=False)
+-------------------+----+
|date |val |
+-------------------+----+
|2022-03-19 05:30:00|14.0|
|2022-03-18 05:30:00|14.0|
|2022-03-17 05:30:00|14.0|
|2022-03-16 05:30:00|9.5 |
|2022-03-15 05:30:00|9.5 |
|2022-03-14 05:30:00|9.5 |
|2022-03-13 05:30:00|14.0|
|2022-03-12 05:30:00|14.0|
|2022-03-11 05:30:00|14.0|
|2022-03-10 05:30:00|1.0 |
|2022-03-09 05:30:00|1.0 |
|2022-03-08 05:30:00|24.0|
+-------------------+----+