有多个属于不同周的数据文件 - 所有格式相同的文件。我需要使用在Spark上运行的scala代码来整合文件。最终结果应仅是键的唯一记录,最终结果还应保留相同键字段的最新文件中的记录。
每个数据文件可能具有接近 1/2 十亿条记录,因此代码必须是高性能的......
例:
最新数据文件
CID PID Metric
C1 P1 10
C2 P1 20
C2 P2 30
以前的数据文件
CID PID Metric
C1 P1 20
C2 P1 30
C3 P1 40
C3 P2 50
最早的数据文件
CID PID Metric
C1 P1 30
C2 P1 40
C3 P1 50
C3 P2 60
C4 P1 30
输出文件预期
C1 P1 10
C2 P1 20
C2 P2 30
C3 P1 40
C3 P2 50
C4 P1 30
可以将"期限"列分配给每个数据帧,然后将数据帧合并为一个,然后使用窗口函数:
// data preparation
val columnNames = List("CID", "PID", "Metric")
val latest = List(
("C1", "P1", 10),
("C2", "P1", 20),
("C2", "P2", 30)
).toDF(columnNames: _*)
val previous = List(
("C1", "P1", 20),
("C2", "P1", 30),
("C3", "P1", 40),
("C3", "P2", 50)
).toDF(columnNames: _*)
val oldest = List(
("C1", "P1", 30),
("C2", "P1", 40),
("C3", "P1", 50),
("C3", "P2", 60),
("C4", "P1", 30)
).toDF(columnNames: _*)
// data preparation
val dfList = List(oldest, previous, latest)
val dfListWithIndexColumn = dfList.zipWithIndex.map { case (df, index) => df.withColumn("age", lit(index)) }
val unitedDF = dfListWithIndexColumn.reduce(_ union _)
val cidPidWindow = Window.partitionBy("CID", "PID").orderBy($"age".desc)
val result = unitedDF
.withColumn("rank", rank.over(cidPidWindow))
.where($"rank" === 1)
.drop("age", "rank")
result.show(false)
输出:
+---+---+------+
|CID|PID|Metric|
+---+---+------+
|C1 |P1 |10 |
|C2 |P1 |20 |
|C2 |P2 |30 |
|C3 |P1 |40 |
|C3 |P2 |50 |
|C4 |P1 |30 |
+---+---+------+