我有以下形式的数据在我的spark数据框架。
+----+------+------+------+-----------+-------------------------+
| id | name | age | city | operation | update_time |
+----+------+------+------+-----------+-------------------------+
| 1 | jon | 12 | NULL | INSERT | 2021-10-11T16:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 1 | NULL | NULL | NY | UPDATE | 2021-10-11T17:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 1 | jack | NULL | NULL | UPDATE | 2021-10-11T18:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 2 | sam | 11 | TN | INSERT | 2021-10-11T18:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 3 | tim | NULL | CA | INSERT | 2021-10-11T16:11:00.378 |
+----+------+------+------+-----------+-------------------------+
| 3 | NULL | 33 | MT | UPDATE | 2021-10-11T17:11:00.378 |
+----+------+------+------+-----------+-------------------------+
我试图在数据框架中寻找可以帮助我将数据转换为以下形式的函数。但是什么也没找到。我能想到的最多的是连接,但它应该与多个数据框架。但这里我只有一个。那么如何将行折叠为一行,并将所有更新的列值包含在一行中。
+----+------+-----+------+-------------------------+
| id | name | age | city | update_time |
+----+------+-----+------+-------------------------+
| 1 | jack | 12 | NY | 2021-10-11T18:11:00.378 |
+----+------+-----+------+-------------------------+
| 2 | sam | 11 | TN | 2021-10-11T18:11:00.378 |
+----+------+-----+------+-------------------------+
| 3 | tim | 33 | MT | 2021-10-11T17:11:00.378 |
+----+------+-----+------+-------------------------+
package spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object DataFrameWindowFunc extends App {
val spark = SparkSession
.builder()
.master("local")
.appName("DataFrame-WindowFunction")
.getOrCreate()
import spark.implicits._
case class Data(
id: Int,
name: Option[String],
age: Option[Int],
city: Option[String],
operation: String,
updateTime: String
)
val sourceData = Seq(
Data(1, Some("jon"), Some(12), None, "INSERT", "2021-10-11T16:11:00.378"),
Data(1, None, None, Some("NY"), "UPDATE", "2021-10-11T17:11:00.378"),
Data(1, Some("jack"), None, None, "UPDATE", "2021-10-11T18:11:00.378"),
Data(
2,
Some("sam"),
Some(11),
Some("TN"),
"INSERT",
"2021-10-11T18:11:00.378"
),
Data(3, Some("tim"), None, Some("CA"), "INSERT", "2021-10-11T16:11:00.378"),
Data(3, None, Some(33), Some("MT"), "UPDATE", "2021-10-11T17:11:00.378")
).toDF
sourceData.show(false)
// +---+----+----+----+---------+-----------------------+
// |id |name|age |city|operation|updateTime |
// +---+----+----+----+---------+-----------------------+
// |1 |jon |12 |null|INSERT |2021-10-11T16:11:00.378|
// |1 |null|null|NY |UPDATE |2021-10-11T17:11:00.378|
// |1 |jack|null|null|UPDATE |2021-10-11T18:11:00.378|
// |2 |sam |11 |TN |INSERT |2021-10-11T18:11:00.378|
// |3 |tim |null|CA |INSERT |2021-10-11T16:11:00.378|
// |3 |null|33 |MT |UPDATE |2021-10-11T17:11:00.378|
// +---+----+----+----+---------+-----------------------+
val res = sourceData
.orderBy(col("id"), col("updateTime").desc)
.groupBy("id")
.agg(
first(col("name"), true).alias("name"),
first(col("age"), true).alias("age"),
first(col("city"), true).alias("city"),
first(col("updateTime"), true).alias("update_time")
)
.orderBy(col("id"))
res.show(false)
// +---+----+---+----+-----------------------+
// |id |name|age|city|update_time |
// +---+----+---+----+-----------------------+
// |1 |jack|12 |NY |2021-10-11T18:11:00.378|
// |2 |sam |11 |TN |2021-10-11T18:11:00.378|
// |3 |tim |33 |MT |2021-10-11T17:11:00.378|
// +---+----+---+----+-----------------------+
}