Df1:
+-------------------+
| Date|
+-------------------+
|2020-07-01 00:00:00|
|2020-07-02 00:00:00|
|2020-07-03 00:00:00|
|2020-07-04 00:00:00|
|2020-07-05 00:00:00|
|2020-07-06 00:00:00|
|2020-07-07 00:00:00|
|2020-07-08 00:00:00|
|2020-07-09 00:00:00|
|2020-07-10 00:00:00|
+-------------------+
Df2:
+-------------------+----------+--------+--------+
| Date| ID| Val| f_val|
+-------------------+----------+--------+--------+
|2022-03-19 00:00:00| 34| 0.0| 0.0|
|2022-03-19 00:00:00| 108| 0.0| 0.0|
|2022-03-19 00:00:00| 155| 3070.61| 3070.61|
|2022-03-19 00:00:00| 193|22920.73|22920.73|
|2022-03-19 00:00:00| 211| 446.0| 446.0|
|2022-03-19 00:00:00| 321| 9314.16| 9314.16|
|2022-03-19 00:00:00| 362| 391.01| 391.01|
|2022-03-19 00:00:00| 368| 1.81| 1.81|
|2022-03-19 00:00:00| 375| 5.08| 5.08|
|2022-03-19 00:00:00| 530| 2444.76| 2444.76|
+-------------------+----------+--------+--------+
两个数据帧都具有从2022-03-19开始并在2020-07-01结束的CCD_ 1。Df1
包含从结束到开始的一系列唯一值。然而,Df2
是一个相当大的数据集,在2020-07-01之前,在多行中具有相同的日期。Df2
的Date
只有186个不同值,而Df1
的Date
有626个不同值。使用Databricks中的PySpark,我在这里试图实现的是,合并Df1
和Date
0的Date
列,包括Df2
中所有缺失的日期,但Df2
应该用以前的行值填充新获取的行。
以下是通过ID
:进行过滤时的数据样本
+-------------------+----------+--------+--------+
| Date| ID| Val| f_val|
+-------------------+----------+--------+--------+
|2022-03-11 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-07 00:00:00| Ax3838J| 17.94|81129.67|
|2022-02-27 00:00:00| Ax3838J| 20.0|81149.67|
|2021-01-25 00:00:00| Ax3838J| 40.0|81189.67|
|2021-10-22 00:00:00| Ax3838J| 89.06|81278.73|
|2021-10-18 00:00:00| Ax3838J| 10.89|81289.62|
|2021-10-15 00:00:00| Ax3838J| 60.0|81349.62|
|2021-09-22 00:00:00| Ax3838J| -250.0|81099.62|
+-------------------+----------+--------+--------+
最后的期望是:
+-------------------+----------+--------+--------+
| Date| ID| Val| f_val|
+-------------------+----------+--------+--------+
|2022-03-11 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-10 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-09 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-08 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-07 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-06 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-05 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-04 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-03 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-02 00:00:00| Ax3838J| 17.94|81129.67|
. . . .
. . . .
+-------------------+----------+--------+--------+
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys
# first join the dataframes with left join. Keep df1 at the left side as it contains data for all the dates.
joined_df = df1.join(df2, df1.date === df2.date, "left")
# define the window
window = Window.orderBy('date')
.rowsBetween(-sys.maxsize, 0)
# define the fill forward columns
filled_column_id = last(joined_df['ID'], ignorenulls=True).over(window)
filled_column_Val = last(joined_df['Val'], ignorenulls=True).over(window)
filled_column_fval = last(joined_df['f_val'], ignorenulls=True).over(window)
# replace the existing columns with filled columns
joined_filled_df = joined_df.withColumn('ID', filled_column_id).withColumn('Val', filled_column_Val).withColumn('f_val', filled_column_fval)