Env: Spark 1.6, Scala
你好
我在数据帧中有记录,如波纹管
reportTime serverNAme channel viewer
2017-01-12 19:16:58.76 MTRL28DSERV722 Channel1 1192
2017-01-12 19:16:59.213 MTRL28DSERV722 Channel1 668
2017-01-12 19:17:05.193 BBBN23DSERV000 Channel1 795
2017-01-12 19:17:01.15 BBBN23DSERV000 Channel1 700
2017-01-12 19:16:58.76 MTRL28DSERV722 Channel3 100
2017-01-12 19:16:59.213 MTRL28DSERV722 Channel3 110
2017-01-12 19:17:05.193 BBBN23DSERV000 Channel3 200
2017-01-12 19:17:01.15 BBBN23DSERV000 Channel3 50
我需要输出:每个服务器按频道的最新查看器 - 以便
2017-01-12 19:16:59.213 MTRL28DSERV722 Channel1 668
2017-01-12 19:17:05.193 BBBN23DSERV000 Channel1 795
2017-01-12 19:16:59.213 MTRL28DSERV722 Channel3 100
2017-01-12 19:17:05.193 BBBN23DSERV000 Channel3 200
我怎样才能得到结果?我不想将DF注册为temptable并在其上使用SQL,因为SQL在Spark 1.6中没有优化。
谢谢
侯赛因
您可以在 severName
和 channel
的每个组合中按reportTime
降序创建一个row_number
,然后使用 where
过滤每个组中的第一行,该行应该是具有最新报告时间的第一行:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
val w = Window.partitionBy($"serverNAme", $"channel").orderBy($"reportTime".desc)
df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop($"rn").show
+--------------------+--------------+--------+------+
| reportTime| serverNAme| channel|viewer|
+--------------------+--------------+--------+------+
|2017-01-12 19:17:...|BBBN23DSERV000|Channel3| 200|
|2017-01-12 19:16:...|MTRL28DSERV722|Channel3| 110|
|2017-01-12 19:17:...|BBBN23DSERV000|Channel1| 795|
|2017-01-12 19:16:...|MTRL28DSERV722|Channel1| 668|
+--------------------+--------------+--------+------+