合并pyspark中具有条件的两个数据帧


Df1:
+-------------------+
|               Date|
+-------------------+
|2020-07-01 00:00:00|
|2020-07-02 00:00:00|
|2020-07-03 00:00:00|
|2020-07-04 00:00:00|
|2020-07-05 00:00:00|
|2020-07-06 00:00:00|
|2020-07-07 00:00:00|
|2020-07-08 00:00:00|
|2020-07-09 00:00:00|
|2020-07-10 00:00:00|
+-------------------+
Df2:
+-------------------+----------+--------+--------+
|               Date|        ID|     Val|   f_val|
+-------------------+----------+--------+--------+
|2022-03-19 00:00:00|        34|     0.0|     0.0|
|2022-03-19 00:00:00|       108|     0.0|     0.0|
|2022-03-19 00:00:00|       155| 3070.61| 3070.61|
|2022-03-19 00:00:00|       193|22920.73|22920.73|
|2022-03-19 00:00:00|       211|   446.0|   446.0|
|2022-03-19 00:00:00|       321| 9314.16| 9314.16|
|2022-03-19 00:00:00|       362|  391.01|  391.01|
|2022-03-19 00:00:00|       368|    1.81|    1.81|
|2022-03-19 00:00:00|       375|    5.08|    5.08|
|2022-03-19 00:00:00|       530| 2444.76| 2444.76|
+-------------------+----------+--------+--------+

两个数据帧都具有从2022-03-19开始并在2020-07-01结束的CCD_ 1。Df1包含从结束到开始的一系列唯一值。然而,Df2是一个相当大的数据集,在2020-07-01之前,在多行中具有相同的日期。Df2Date只有186个不同值,而Df1Date有626个不同值。使用Databricks中的PySpark,我在这里试图实现的是,合并Df1Date0的Date列,包括Df2中所有缺失的日期,但Df2应该用以前的行值填充新获取的行。

以下是通过ID:进行过滤时的数据样本

+-------------------+----------+--------+--------+
|               Date|        ID|     Val|   f_val|
+-------------------+----------+--------+--------+
|2022-03-11 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-07 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-02-27 00:00:00|   Ax3838J|    20.0|81149.67|
|2021-01-25 00:00:00|   Ax3838J|    40.0|81189.67|
|2021-10-22 00:00:00|   Ax3838J|   89.06|81278.73|
|2021-10-18 00:00:00|   Ax3838J|   10.89|81289.62|
|2021-10-15 00:00:00|   Ax3838J|    60.0|81349.62|
|2021-09-22 00:00:00|   Ax3838J|  -250.0|81099.62|
+-------------------+----------+--------+--------+

最后的期望是:

+-------------------+----------+--------+--------+
|               Date|        ID|     Val|   f_val|
+-------------------+----------+--------+--------+
|2022-03-11 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-10 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-09 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-08 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-07 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-06 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-05 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-04 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-03 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-02 00:00:00|   Ax3838J|   17.94|81129.67|
.                   .         .       .
.                   .         .       .
+-------------------+----------+--------+--------+
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys
# first join the dataframes with left join. Keep df1 at the left side as it contains data for all the dates.
joined_df = df1.join(df2, df1.date ===  df2.date, "left")
# define the window
window = Window.orderBy('date')
.rowsBetween(-sys.maxsize, 0)
# define the fill forward columns
filled_column_id = last(joined_df['ID'], ignorenulls=True).over(window)
filled_column_Val = last(joined_df['Val'], ignorenulls=True).over(window)
filled_column_fval = last(joined_df['f_val'], ignorenulls=True).over(window)
# replace the existing columns with filled columns
joined_filled_df = joined_df.withColumn('ID',  filled_column_id).withColumn('Val',  filled_column_Val).withColumn('f_val',  filled_column_fval)

相关内容

  • 没有找到相关文章

最新更新