加入两个数据帧后,对最终DataFrame进行PySpark Conditional格式化



PySpark DataFrame场景:

  1. 有一个名为DF的DataFrame。DF的两个主要列是IDDate
  2. 每个ID平均具有40+个唯一的Dates(非连续日期(
  3. 现在,有第二个名为DF_date的DataFrame,它有一列名为DateDates中的日期介于DF中"日期"的最大值和最小值之间
  4. 现在,目标是用每个唯一"ID"的连续开始和结束日期填充DF(缺少的终止日期用DF_dateDF之间的left join填充(

DF

+-------------+-------------+----------------+
|         Date|          Val|              ID|
+-------------+-------------+----------------+
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
+-------------+-------------+----------------+ 

DF_date

+-------------+
|         Date|
+-------------+
|   2021-07-01|
|   2021-07-02|
|   2021-07-03|
|   2021-07-04|
|   2021-07-05|
|   2021-07-06|
+-------------+

预期最终输出:

+-------------+-------------+----------------+
|         Date|          Val|              ID|
+-------------+-------------+----------------+
|   2021-07-01|     81119.73|         Ax3838J|
|   2021-07-02|     81119.73|         Ax3838J|
|   2021-07-03|     81119.73|         Ax3838J|
|   2021-07-04|     81289.62|         Ax3838J|
|   2021-07-05|     81385.62|         Ax3838J|
|   2021-07-02|     81249.76|         Bz3838J|
|   2021-07-03|     81249.76|         Bz3838J|
|   2021-07-04|     81249.76|         Bz3838J|
|   2021-07-05|     81324.28|         Bz3838J|
|   2021-07-06|     81329.28|         Bz3838J|
+-------------+-------------+----------------+ 

你的问题没有意义。为什么要有一个带有开始和结束日期的DF_date数据帧,用它们来填写日期,然后使用DF开始和结束时间。为什么不使用每组的DF最小和最大日期来填写缺失的日期呢。

不管怎样,这就是根据DF_Date填写缺失日期的方法

根据您的评论,查看我的编辑

new = (DF.groupby('ID')
.agg(to_date(first('Date')).alias('min_date')#minimum date per group
,to_date(last('Date')).alias('max_date')#max date per group
,*[collect_list(i).alias(f"{i}") for i in DF.drop('ID').columns])#Dates and Val into an array for each group

#Explosion results into a new column 2 which ideally is the new date, Drop existing date and rename 2 to date
.selectExpr("ID","inline(arrays_zip(Date,Val,sequence(min_date,max_date,interval 1 day)))")
.drop('Date').withColumnRenamed('2','Date')
#Forward fill the Val column
.withColumn('Val', coalesce(last('val',True).over(Window.partitionBy('ID').orderBy('Date'))))

).show()
+-------+--------+----------+
|     ID|     Val|      Date|
+-------+--------+----------+
|Ax3838J|81119.73|2021-07-01|
|Ax3838J|81289.62|2021-07-02|
|Ax3838J|81385.62|2021-07-03|
|Ax3838J|81385.62|2021-07-04|
|Ax3838J|81385.62|2021-07-05|
|Bz3838J|81249.76|2021-07-02|
|Bz3838J|81324.28|2021-07-03|
|Bz3838J|81329.28|2021-07-04|
|Bz3838J|81329.28|2021-07-05|
|Bz3838J|81329.28|2021-07-06|
+-------+--------+----------+

在上面的问题中,我后来意识到,按照@wwnde的建议,没有必要为Dates创建单独的DF。

下面提供的代码也符合目的-

# Partition the data based on the client and order by DATE
window_fn = Window.partitionBy("ID").orderBy('DATE')

# the ranges of dates between the DATE value in the current row and the following row 
next_date = F.coalesce(F.lead("DATE", 1).over(window_fn), F.col("DATE") + F.expr("interval 1 day"))
end_date_range = next_date - F.expr("interval 1 day")
# then using 'sequence' function to generate all intermediate dates
# exploded this array to fill in values for the missing dates.
final_result = DF.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 day")))
.withColumn("DATE", F.explode("Ranges"))
.withColumn("DATE", F.to_timestamp("date", 'yyyy-MM-dd'))
.drop("Ranges")
display(final_result)

相关内容

  • 没有找到相关文章

最新更新