PySpark DataFrame场景:
- 有一个名为
DF
的DataFrame。DF
的两个主要列是ID
和Date
- 每个
ID
平均具有40+个唯一的Date
s(非连续日期( - 现在,有第二个名为
DF_date
的DataFrame,它有一列名为Date
。Dates
中的日期介于DF
中"日期"的最大值和最小值之间 - 现在,目标是用每个唯一"ID"的连续开始和结束日期填充
DF
(缺少的终止日期用DF_date
和DF
之间的left join
填充(
DF
+-------------+-------------+----------------+
| Date| Val| ID|
+-------------+-------------+----------------+
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
+-------------+-------------+----------------+
DF_date
+-------------+
| Date|
+-------------+
| 2021-07-01|
| 2021-07-02|
| 2021-07-03|
| 2021-07-04|
| 2021-07-05|
| 2021-07-06|
+-------------+
预期最终输出:
+-------------+-------------+----------------+
| Date| Val| ID|
+-------------+-------------+----------------+
| 2021-07-01| 81119.73| Ax3838J|
| 2021-07-02| 81119.73| Ax3838J|
| 2021-07-03| 81119.73| Ax3838J|
| 2021-07-04| 81289.62| Ax3838J|
| 2021-07-05| 81385.62| Ax3838J|
| 2021-07-02| 81249.76| Bz3838J|
| 2021-07-03| 81249.76| Bz3838J|
| 2021-07-04| 81249.76| Bz3838J|
| 2021-07-05| 81324.28| Bz3838J|
| 2021-07-06| 81329.28| Bz3838J|
+-------------+-------------+----------------+
你的问题没有意义。为什么要有一个带有开始和结束日期的DF_date
数据帧,用它们来填写日期,然后使用DF
开始和结束时间。为什么不使用每组的DF
最小和最大日期来填写缺失的日期呢。
不管怎样,这就是根据DF_Date
填写缺失日期的方法
根据您的评论,查看我的编辑
new = (DF.groupby('ID')
.agg(to_date(first('Date')).alias('min_date')#minimum date per group
,to_date(last('Date')).alias('max_date')#max date per group
,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group
#Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
.selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
.drop('Date').withColumnRenamed('2','Date')
#Forward fill the Val column
.withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))
).show()
+-------+--------+----------+
| ID| Val| Date|
+-------+--------+----------+
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
+-------+--------+----------+
在上面的问题中,我后来意识到,按照@wwnde的建议,没有必要为Dates创建单独的DF。
下面提供的代码也符合目的-
# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')
# the ranges of dates between the DATE value in the current row and the following row
next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE") + F.expr("interval 1 day"))
end_date_range = next_date - F.expr("interval 1 day")
# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.
final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))
.withColumn("DATE", F.explode("Ranges"))
.withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))
.drop("Ranges")
display(final_result)