我正在使用Spark 1.5.0。我有一个Spark DataFrame,带有以下列:
| user_id | description | fName | weight |
我想做的是每个用户选择前10排和底部10行(基于列重量的值,即数据类型double)。我该如何使用Spark SQL或DataFrame操作?
例如。为简单起见,我只选择每个用户的顶部2行(基于重量)。我想根据绝对重量的值对O/P进行分类。
u1 desc1 f1 -0.20
u1 desc1 f1 +0.20
u2 desc1 f1 0.80
u2 desc1 f1 -0.60
u1 desc1 f1 1.10
u1 desc1 f1 6.40
u2 desc1 f1 0.05
u1 desc1 f1 -3.20
u2 desc1 f1 0.50
u2 desc1 f1 -0.70
u2 desc1 f1 -0.80
这是所需的O/P:
u1 desc1 f1 6.40
u1 desc1 f1 -3.20
u1 desc1 f1 1.10
u1 desc1 f1 -0.20
u2 desc1 f1 0.80
u2 desc1 f1 -0.80
u2 desc1 f1 -0.70
u2 desc1 f1 0.50
您可以使用 row_number
:
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"user_id")
val rankAsc = row_number().over(w.orderBy($"weight")).alias("rank_asc")
val rankDesc = row_number().over(w.orderBy($"weight".desc)).alias("rank_desc")
df.select($"*", rankAsc, rankDesc).filter($"rank_asc" <= 2 || $"rank_desc" <= 2)
在Spark 1.5.0中,您可以使用rowNumber
而不是row_number
。