假设我有两个数据帧
第一个数据帧具有value
列的值(每个id唯一(
id date value some_other_columns...
1 2020-10-01 'a'
2 2020-09-30 'b'
2 2020-10-01 'b'
3 2020-10-01 'c'
第二个数据帧具有value
列的空值
id date value some_other_columns...
1 2020-10-02 NULL
2 2020-10-02 NULL
4 2020-10-02 NULL
5 2020-10-02 NULL
6 2020-10-02 NULL
我想合并这两个数据帧,并创建一个新列is_active
来确定按id分组的最新日期(但保留其他列(,如果第一个数据帧中存在id
,则从第二个数据帧中分配value
id date value some_other_columns... is_active
1 2020-10-01 'a' 0
1 2020-10-02 'a' 1
2 2020-09-30 'b' 0
2 2020-10-01 'b' 0
2 2020-10-02 'b' 1
3 2020-10-01 'c' 1
4 2020-10-02 NULL 1
5 2020-10-02 NULL 1
6 2020-10-02 NULL 1
假设您的两个数据帧分别为df_1
和df_2
。为了将values
分配给df_2
,可以从df_1
执行left join
。
from pyspark.sql.functions import *
df_1_ = df_1.select("id", "value").withColumnRenamed("id", "id_1")
df_2 = df_2.drop("value").join(df_1_, (df_2.id == df_1_.id_1), "left")
.drop("id_1").distinct()
df_2.show()
+---+----------+-----+
| id| date|value|
+---+----------+-----+
| 1|2020-10-02| a|
| 2|2020-10-02| b|
| 4|2020-10-02| null|
| 5|2020-10-02| null|
| 6|2020-10-02| null|
+---+----------+-----+
现在要获得is_active
列,您可以联合然后使用窗口函数(row_number()
或rank()
,具体取决于您的需要(:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('id').orderBy(desc("date"))
df_1.union(df_2).withColumn("is_active", F.when(row_number().over(w)==1, 1)
.otherwise(0)).orderBy("id", "date").show()
+---+----------+-----+---------+
| id| date|value|is_active|
+---+----------+-----+---------+
| 1|2020-10-01| a| 0|
| 1|2020-10-02| a| 1|
| 2|2020-09-30| b| 0|
| 2|2020-10-01| b| 0|
| 2|2020-10-02| b| 1|
| 3|2020-10-01| c| 1|
| 4|2020-10-02| null| 1|
| 5|2020-10-02| null| 1|
| 6|2020-10-02| null| 1|
+---+----------+-----+---------+