基于其他列将多个spark数据框行合并为一行,即应用CDC



我有以下形式的数据在我的spark数据框架。

+----+------+------+------+-----------+-------------------------+
| id | name | age  | city | operation | update_time             |
+----+------+------+------+-----------+-------------------------+
| 1  | jon  | 12   | NULL | INSERT    | 2021-10-11T16:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 1  | NULL | NULL | NY   | UPDATE    | 2021-10-11T17:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 1  | jack | NULL | NULL | UPDATE    | 2021-10-11T18:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 2  | sam  | 11   | TN   | INSERT    | 2021-10-11T18:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 3  | tim  | NULL | CA   | INSERT    | 2021-10-11T16:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 3  | NULL | 33   | MT   | UPDATE    | 2021-10-11T17:11:00.378 |
+----+------+------+------+-----------+-------------------------+



我试图在数据框架中寻找可以帮助我将数据转换为以下形式的函数。但是什么也没找到。我能想到的最多的是连接,但它应该与多个数据框架。但这里我只有一个。那么如何将行折叠为一行,并将所有更新的列值包含在一行中。

+----+------+-----+------+-------------------------+
| id | name | age | city | update_time             |
+----+------+-----+------+-------------------------+
| 1  | jack | 12  | NY   | 2021-10-11T18:11:00.378 |
+----+------+-----+------+-------------------------+
| 2  | sam  | 11  | TN   | 2021-10-11T18:11:00.378 |
+----+------+-----+------+-------------------------+
| 3  | tim  | 33  | MT   | 2021-10-11T17:11:00.378 |
+----+------+-----+------+-------------------------+
package spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object DataFrameWindowFunc extends App {
val spark = SparkSession
.builder()
.master("local")
.appName("DataFrame-WindowFunction")
.getOrCreate()
import spark.implicits._
case class Data(
id: Int,
name: Option[String],
age: Option[Int],
city: Option[String],
operation: String,
updateTime: String
)
val sourceData = Seq(
Data(1, Some("jon"), Some(12), None, "INSERT", "2021-10-11T16:11:00.378"),
Data(1, None, None, Some("NY"), "UPDATE", "2021-10-11T17:11:00.378"),
Data(1, Some("jack"), None, None, "UPDATE", "2021-10-11T18:11:00.378"),
Data(
2,
Some("sam"),
Some(11),
Some("TN"),
"INSERT",
"2021-10-11T18:11:00.378"
),
Data(3, Some("tim"), None, Some("CA"), "INSERT", "2021-10-11T16:11:00.378"),
Data(3, None, Some(33), Some("MT"), "UPDATE", "2021-10-11T17:11:00.378")
).toDF
sourceData.show(false)
//      +---+----+----+----+---------+-----------------------+
//      |id |name|age |city|operation|updateTime             |
//      +---+----+----+----+---------+-----------------------+
//      |1  |jon |12  |null|INSERT   |2021-10-11T16:11:00.378|
//      |1  |null|null|NY  |UPDATE   |2021-10-11T17:11:00.378|
//      |1  |jack|null|null|UPDATE   |2021-10-11T18:11:00.378|
//      |2  |sam |11  |TN  |INSERT   |2021-10-11T18:11:00.378|
//      |3  |tim |null|CA  |INSERT   |2021-10-11T16:11:00.378|
//      |3  |null|33  |MT  |UPDATE   |2021-10-11T17:11:00.378|
//      +---+----+----+----+---------+-----------------------+
val res = sourceData
.orderBy(col("id"), col("updateTime").desc)
.groupBy("id")
.agg(
first(col("name"), true).alias("name"),
first(col("age"), true).alias("age"),
first(col("city"), true).alias("city"),
first(col("updateTime"), true).alias("update_time")
)
.orderBy(col("id"))
res.show(false)
//  +---+----+---+----+-----------------------+
//  |id |name|age|city|update_time            |
//  +---+----+---+----+-----------------------+
//  |1  |jack|12 |NY  |2021-10-11T18:11:00.378|
//  |2  |sam |11 |TN  |2021-10-11T18:11:00.378|
//  |3  |tim |33 |MT  |2021-10-11T17:11:00.378|
//  +---+----+---+----+-----------------------+
}