只保留基于垂直条件为PySpark的列的更新日期



对于每个ID,我希望只选择基于";A";列基于某些条件。

用例只是一个例子。以下几点通过添加这些条件使垂直处理数据的练习变得复杂:

1-我想跟踪列"上的所有更改;A";,只保留该值首次出现的

2-我想保留NULL

3-";A";可以高于或低于或等于前一个

4-我没有dt_run时间戳。所以我想假设,如果在同一天内;A";值,这是由于无法对dt_run进行排序造成的。如果今天包含的值等于前一天和下一天的值,我们可以删除它们(假设我们可以直接转到下一次更新[示例中为粗体](。如果这一天包含不同的值,则将该值赋予它[示例中为斜体]。

5-我期望的输出是在一行中有ID-dt_run

Input df

ID A dt_run
1 45 2022-02-11
1 72 2022-02-13
1 45 2022-02-13
1 72 2022-02-13
1 72 2022-02-15
1 45 2022-02-16
2 88 2022-02-16
2 88 2022-02-16
2 88
2772022-02-17//td>
2>2022-02-117
22022-02-18
2922022-02-19

可以使用窗口函数实现。参见下方的逻辑和代码

W=Window.partitionBy('ID').orderBy('dt_run')
new = (df.withColumn('x',row_number().over(W))#Create Row number for each ID
.withColumn('y',max('x').over(Window.partitionBy('A','ID')))#Find maximum index in a combination of A and ID
.where((col('x')==col('y'))|(col('x')==1))#Filter where index and maximum index are equal or the index is the first
.orderBy('ID','dt_run')#reorder the frame
.drop('x','y')#drop unwanted columns
).show(truncate=False)
+---+----+----------+
|ID |A   |dt_run    |
+---+----+----------+
|1  |45  |2022-02-11|
|1  |72  |2022-02-15|
|1  |45  |2022-02-16|
|2  |88  |2022-02-16|
|2  |77  |2022-02-17|
|2  |88  |2022-02-17|
|2  |null|2022-02-18|
|2  |92  |2022-02-19|
+---+----+----------+

最新更新