Scala Code - 使用关键字段在多个日期中合并多个相同格式的数据文件,并仅保留最新的日期数据



有多个属于不同周的数据文件 - 所有格式相同的文件。我需要使用在Spark上运行的scala代码来整合文件。最终结果应仅是键的唯一记录,最终结果还应保留相同键字段的最新文件中的记录。

每个数据文件可能具有接近 1/2 十亿条记录,因此代码必须是高性能的......

例:

最新数据文件

CID PID Metric
C1  P1  10
C2  P1  20
C2  P2  30

以前的数据文件

CID PID Metric
C1  P1  20
C2  P1  30
C3  P1  40
C3  P2  50

最早的数据文件

CID PID Metric
C1  P1  30
C2  P1  40
C3  P1  50
C3  P2  60
C4  P1  30

输出文件预期

C1  P1  10
C2  P1  20
C2  P2  30
C3  P1  40
C3  P2  50
C4  P1  30

可以将"期限"列分配给每个数据帧,然后将数据帧合并为一个,然后使用窗口函数:

// data preparation
val columnNames = List("CID", "PID", "Metric")
val latest = List(
("C1", "P1", 10),
("C2", "P1", 20),
("C2", "P2", 30)
).toDF(columnNames: _*)
val previous = List(
("C1", "P1", 20),
("C2", "P1", 30),
("C3", "P1", 40),
("C3", "P2", 50)
).toDF(columnNames: _*)
val oldest = List(
("C1", "P1", 30),
("C2", "P1", 40),
("C3", "P1", 50),
("C3", "P2", 60),
("C4", "P1", 30)
).toDF(columnNames: _*)
//  data preparation
val dfList = List(oldest, previous, latest)
val dfListWithIndexColumn = dfList.zipWithIndex.map { case (df, index) => df.withColumn("age", lit(index)) }
val unitedDF = dfListWithIndexColumn.reduce(_ union _)
val cidPidWindow = Window.partitionBy("CID", "PID").orderBy($"age".desc)
val result = unitedDF
.withColumn("rank", rank.over(cidPidWindow))
.where($"rank" === 1)
.drop("age", "rank")
result.show(false)

输出:

+---+---+------+
|CID|PID|Metric|
+---+---+------+
|C1 |P1 |10    |
|C2 |P1 |20    |
|C2 |P2 |30    |
|C3 |P1 |40    |
|C3 |P2 |50    |
|C4 |P1 |30    |
+---+---+------+

最新更新