我有一个如下的数据帧:
Rowkey timestamp col_1 col_2 col_3.... col_n
1234 165789 20 null 30 ... null
1234 155789 20 20 null ... 40
1234 145789 20 10 30 ... 50
除了将其转换为以下数据帧:
Rowkey timestamp col_1 col_2 col_3.... col_n
1234 165789 20 20 30 ... 40
我想要最新的时间戳。此外,如果一个单元格是null
,并且具有相同Rowkey
的下一个单元格具有值,则应使用该值。
我在Scala中使用Spark。
这是我的看法:
使用Window函数选择每个Rowkey
分区的第一个非null值,按timestamp
排序,然后删除重复项,使每个Rowkey
只有一行。
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val simpleData: Seq[(String, Integer,Integer,Integer,Integer,Integer)] = Seq(
("1234",165789,20,null,30, null),
("1234",155789,10,20,null, 40),
("1234",145789,2,10,30, 50),
("123e4",145789,2,10,30, 50)
)
val someDF = simpleData.toDF("Rowkey","timestamp","col_1","col_2","col_3","col_4")
someDF.show()
val listCols= List("Rowkey","timestamp","col_1","col_2","col_3","col_4")
val windowSpec = Window.partitionBy("Rowkey").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
someDF.select(
listCols.map(m=> first(m, true)
.over(windowSpec).alias(m)
) :_*
)
.dropDuplicates()
.show()
结果:
+------+---------+-----+-----+-----+-----+
|Rowkey|timestamp|col_1|col_2|col_3|col_4|
+------+---------+-----+-----+-----+-----+
| 1234| 165789| 20| null| 30| null|
| 1234| 155789| 10| 20| null| 40|
| 1234| 145789| 2| 10| 30| 50|
| 123e4| 145789| 2| 10| 30| 50|
+------+---------+-----+-----+-----+-----+
+------+---------+-----+-----+-----+-----+
|Rowkey|timestamp|col_1|col_2|col_3|col_4|
+------+---------+-----+-----+-----+-----+
| 1234| 165789| 20| 20| 30| 40|
| 123e4| 145789| 2| 10| 30| 50|
+------+---------+-----+-----+-----+-----+