1652265268576 1652265283215
下面有两个数据框
df1:
ingested_at可能有一个更优的解决方案,但从你给的信息来看,我假设df1
和df2
具有相同的模式
- 从合并两个dataframe开始:
- 对
txn_id
的每个值进行排序以获取最新记录 - 丢弃排序表明不是最新记录的行
实现看起来像这样:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
final_df = df1
.unionByName(df2)
.withColumn('row_num', row_number().over(Window.partitionBy('txn_id').orderBy(desc('ingested_at'))))
.filter(col('row_num') > 1)
.drop('row_num')
final_df
将包含每个txn_id
的最新记录