合并两个Spark数据帧并添加新列以标识最新日期



假设我有两个数据帧

第一个数据帧具有value列的值(每个id唯一(

id   date          value  some_other_columns...
1    2020-10-01    'a'    
2    2020-09-30    'b'    
2    2020-10-01    'b'
3    2020-10-01    'c'

第二个数据帧具有value列的空值

id   date          value  some_other_columns...
1    2020-10-02    NULL
2    2020-10-02    NULL
4    2020-10-02    NULL
5    2020-10-02    NULL
6    2020-10-02    NULL

我想合并这两个数据帧,并创建一个新列is_active来确定按id分组的最新日期(但保留其他列(,如果第一个数据帧中存在id,则从第二个数据帧中分配value

id   date          value  some_other_columns... is_active
1    2020-10-01    'a'                          0
1    2020-10-02    'a'                          1
2    2020-09-30    'b'                          0
2    2020-10-01    'b'                          0
2    2020-10-02    'b'                          1
3    2020-10-01    'c'                          1
4    2020-10-02    NULL                         1
5    2020-10-02    NULL                         1
6    2020-10-02    NULL                         1

假设您的两个数据帧分别为df_1df_2。为了将values分配给df_2,可以从df_1执行left join

from pyspark.sql.functions import *
df_1_ = df_1.select("id", "value").withColumnRenamed("id", "id_1")
df_2 = df_2.drop("value").join(df_1_, (df_2.id == df_1_.id_1), "left")
.drop("id_1").distinct()
df_2.show()
+---+----------+-----+                                                          
| id|      date|value|
+---+----------+-----+
|  1|2020-10-02|    a|
|  2|2020-10-02|    b|
|  4|2020-10-02| null|
|  5|2020-10-02| null|
|  6|2020-10-02| null|
+---+----------+-----+

现在要获得is_active列,您可以联合然后使用窗口函数(row_number()rank(),具体取决于您的需要(:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('id').orderBy(desc("date"))
df_1.union(df_2).withColumn("is_active", F.when(row_number().over(w)==1, 1)
.otherwise(0)).orderBy("id", "date").show()
+---+----------+-----+---------+                                                
| id|      date|value|is_active|
+---+----------+-----+---------+
|  1|2020-10-01|    a|        0|
|  1|2020-10-02|    a|        1|
|  2|2020-09-30|    b|        0|
|  2|2020-10-01|    b|        0|
|  2|2020-10-02|    b|        1|
|  3|2020-10-01|    c|        1|
|  4|2020-10-02| null|        1|
|  5|2020-10-02| null|        1|
|  6|2020-10-02| null|        1|
+---+----------+-----+---------+

最新更新