在pyspark中逐日期重新采样日期时间



我试图在pyspark中使用熊猫的resample技术,但没有得出任何结论。

+----------------------------+----+
|               date         | val|
+-----+---------------------------+
|2022-03-19T00:00:00.000+0000|14.0|
|2022-03-16T00:00:00.000+0000| 9.5|
|2022-03-13T00:00:00.000+0000|14.0|
|2022-03-10T00:00:00.000+0000| 1.0|
|2022-03-08T00:00:00.000+0000|24.0|
+-----+-------------------+----+--+

我有一个类似上面的数据集。然而,我想在pyspark中重新采样一个数据集,类似这样:

+----------------------------+----+
|               date         | val|
+-----+---------------------------+
|2022-03-19T00:00:00.000+0000|14.0|
|2022-03-18T00:00:00.000+0000|14.0|
|2022-03-17T00:00:00.000+0000|14.0|
|2022-03-16T00:00:00.000+0000| 9.5|
|2022-03-15T00:00:00.000+0000| 9.5|
|2022-03-14T00:00:00.000+0000| 9.5|
|2022-03-13T00:00:00.000+0000|14.0|
|2022-03-12T00:00:00.000+0000|14.0|
|2022-03-11T00:00:00.000+0000|14.0|
|2022-03-10T00:00:00.000+0000| 1.0|
|2022-03-09T00:00:00.000+0000| 1.0|
|2022-03-08T00:00:00.000+0000|24.0|
+-----+-------------------+----+--+

目标是按顺序排列日期,并用缺失的日期填充。

  • 新的val列值应使用已存在的前一列的值填充

您可以首先获得每行可用的前一天,然后添加1天以获得前一天的下一天。

然后生成一个序列并爆炸:

from pyspark.sql import functions as F, Window as W
w = W.orderBy(F.desc("date")) #add .partitionBy(partitioncolumn)
out = df.withColumn("prev_",F.date_add(F.lead("date").over(w),1))
.withColumn("NewDate",
F.explode_outer(F.expr("sequence(date,prev_)"))
)
.withColumn("date",F.coalesce("NewDate","date")).select(*df.columns)

out.show(truncate=False)
+-------------------+----+
|date               |val |
+-------------------+----+
|2022-03-19 05:30:00|14.0|
|2022-03-18 05:30:00|14.0|
|2022-03-17 05:30:00|14.0|
|2022-03-16 05:30:00|9.5 |
|2022-03-15 05:30:00|9.5 |
|2022-03-14 05:30:00|9.5 |
|2022-03-13 05:30:00|14.0|
|2022-03-12 05:30:00|14.0|
|2022-03-11 05:30:00|14.0|
|2022-03-10 05:30:00|1.0 |
|2022-03-09 05:30:00|1.0 |
|2022-03-08 05:30:00|24.0|
+-------------------+----+

相关内容

  • 没有找到相关文章

最新更新