pyspark:在pyspark中连接两个数据框时,根据字段ingested_at合并两个有效负载字段



下面有两个数据框

df1:

ingested_at16522652685761652265283215

可能有一个更优的解决方案,但从你给的信息来看,我假设df1df2具有相同的模式

  1. 从合并两个dataframe开始:
  2. txn_id
  3. 的每个值进行排序以获取最新记录
  4. 丢弃排序表明不是最新记录的行

实现看起来像这样:

from pyspark.sql.functions import *
from pyspark.sql.window import Window
final_df = df1 
.unionByName(df2) 
.withColumn('row_num', row_number().over(Window.partitionBy('txn_id').orderBy(desc('ingested_at')))) 
.filter(col('row_num') > 1) 
.drop('row_num')

final_df将包含每个txn_id的最新记录

最新更新