>我有一个数据集,groupby("_1","_2","_3","_4").agg(max("_5").as("time"),collect_list("_6").as("value"))
返回一个数据集,其中包含四列的分组数据和max
的时间列和collect_list
,其中包含该分组数据的所有值,例如[5,1]
但我想要_6
是与所有分组列匹配的值,并且不仅对分组列还具有max("_5").as("time")
代码如下:
val data = Seq(("thing1",1,1,"Temperature",1551501300000L,"5"),("thing1",1,1,"Temperature",1551502200000L,"1"))
import org.apache.spark.sql.functions._
val dataSet = spark.sparkContext.parallelize(data)
import spark.implicits._
val testDS = dataSet.toDS()
testDS.groupby("_1","_2","_3","_4").agg(max("_5").as("time"),collect_list("_6").as("value")).show()
输出:
| _1 | _2 | _3 | _4 | time | value |
|thingId1 | 1 | 1 |Temperature | 1551502200000 | [5,1] |
所需输出
| _1 | _2 | _3 | _4 | time | value |
|thingId1 | 1 | 1 |Temperature | 1551502200000 | 1 |
我不希望值 5 在值列中,因为它不在标准范围内max("time")
我只需要值列中的 1,因为它只匹配所有分组列的条件和max("time")
.
如何实现这一点。
谢谢。
你可以通过使用argmax逻辑来巧妙地做到这一点,而不必使用Window
函数,如下所示:
val data = Seq(("thing1",1,1,"Temperature",1551501300000L,"5"),
("thing1",1,1,"Temperature",1551502200000L,"1")).toDF
data.groupBy("_1","_2","_3","_4").agg(
max(struct("_5", "_6")).as("argmax")).select("_1","_2","_3","_4", "argmax.*").show
+------+---+---+-----------+-------------+---+
| _1| _2| _3| _4| _5| _6|
+------+---+---+-----------+-------------+---+
|thing1| 1| 1|Temperature|1551502200000| 1|
+------+---+---+-----------+-------------+---+
当您在 Spark 的struct
上使用max
时,它会返回具有最高第一个值的struct
,如果有第一个值相等的structs
,则它转到第二个值,依此类推。获得max
struct
后,您可以使用*
通配符从struct
中提取值。
在此方案中使用Window
函数:
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("_1","_2","_3","_4").orderBy(desc("_5"))
testDS.withColumn("rowSelector", row_number() over windowSpec)
.where($"rowSelector" === 1)
.drop($"rowSelector")
.show(false)
输出:
+------+---+---+-----------+-------------+---+
|_1 |_2 |_3 |_4 |_5 |_6 |
+------+---+---+-----------+-------------+---+
|thing1|1 |1 |Temperature|1551502200000|1 |