我有一个数据连接源,创建两个数据集:
- Dataset X (Snapshot)
- Dataset Y(增量)
两个数据集从同一个源提取。数据集X
由源表中所有行的当前状态组成。数据集Y
提取自上次构建以来更新的所有行。然后将这两个数据集下游合并到数据集Z
中,数据集Z
要么是数据集X
,要么是数据集Y
中每一行的最新版本。这使我们既可以获得低延迟更新,又可以维护良好的分区。
当源表中的行被删除时,这些行不再存在于数据集X
中,但仍然存在于数据集Y
中。
在数据集Z
中保留这些"删除"行的最好方法是什么?理想情况下,我也能够快照数据集Y
而不会丢失任何"已删除"的行。
好问题!据我所知,您希望数据集Z
只包含最新的行,包括最新的已删除行。更新的行和删除的行都存在于Y
中。在这种情况下,我建议首先将Y
和X
合并在一起,以便所有行(包括已删除的行)都出现在联合数据集中。然后,在日期列上使用窗口函数,以获取每行的最新版本。以下是我建议的pyspark代码大纲:
from pyspark.sql import Window
import pyspark.sql.functions as F
window = Window.partitionBy(primary_keys).orderBy(F.col(date_column).desc())
Z = X.unionByName(Y) # union to get all columns, including deleted
Z = Z.withColumn("row_num", F.row_number().over(window)) # rank by date created/updated
Z = Z.filter(F.col("row_num") == 1).drop("row_num") # keep only the latest version of each row
请注意,这个解决方案并没有解决如果Y快照会发生什么的问题。