对于每个ID,我希望只选择基于";A";列基于某些条件。
用例只是一个例子。以下几点通过添加这些条件使垂直处理数据的练习变得复杂:
1-我想跟踪列"上的所有更改;A";,只保留该值首次出现的
2-我想保留NULL值
3-";A";可以高于或低于或等于前一个
4-我没有dt_run时间戳。所以我想假设,如果在同一天内;A";值,这是由于无法对dt_run进行排序造成的。如果今天包含的值等于前一天和下一天的值,我们可以删除它们(假设我们可以直接转到下一次更新[示例中为粗体](。如果这一天包含不同的值,则将该值赋予它[示例中为斜体]。
5-我期望的输出是在一行中有ID-dt_run。
Input df
ID | A | dt_run | |||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 45 | 2022-02-11 | |||||||||||
1 | 72 | 2022-02-13 | |||||||||||
1 | 45 | 2022-02-13 | |||||||||||
1 | 72 | 2022-02-13 | |||||||||||
1 | 72 | 2022-02-15 | |||||||||||
1 | 45 | 2022-02-16 | |||||||||||
2 | 88 | 2022-02-16 | |||||||||||
2 | 88 | 2022-02-16 | |||||||||||
2 |
882 | 77 | 2022-02-17//td> | 2 | 空 | >2022-02-117 | 2 | 空 | 2022-02-18 | 2 | 92 | 2022-02-19 | |
可以使用窗口函数实现。参见下方的逻辑和代码
W=Window.partitionBy('ID').orderBy('dt_run')
new = (df.withColumn('x',row_number().over(W))#Create Row number for each ID
.withColumn('y',max('x').over(Window.partitionBy('A','ID')))#Find maximum index in a combination of A and ID
.where((col('x')==col('y'))|(col('x')==1))#Filter where index and maximum index are equal or the index is the first
.orderBy('ID','dt_run')#reorder the frame
.drop('x','y')#drop unwanted columns
).show(truncate=False)
+---+----+----------+
|ID |A |dt_run |
+---+----+----------+
|1 |45 |2022-02-11|
|1 |72 |2022-02-15|
|1 |45 |2022-02-16|
|2 |88 |2022-02-16|
|2 |77 |2022-02-17|
|2 |88 |2022-02-17|
|2 |null|2022-02-18|
|2 |92 |2022-02-19|
+---+----+----------+