我需要有一个基于分数的数据帧的秩id,一个简单的row_number((作为rank_id,因为所有数据都收集到一台机器上。例如
select *, row_number() over (order by score) as rank_id from tbl order by score
单调递增id((不会产生我既不想要的东西,因为我需要连续的秩id。同样的事情在MapReduce中做起来相当简单,但我在Spark中没有找到方法,这很奇怪。。。
事实上,我的同事给了我一个我觉得简洁明了的解决方案,那就是使用zipwinnex,在pyspark中,它将类似于:
df
.select('score', 'user_id')
.rdd
.sortBy(lambda a: a[0], ascending= False)
.zipWithIndex()
.map(lambda x: (x[0][0],x[0][1],x[1])).take(3)
我有一些时间来考虑这个问题,尽管1(我认为传统的Oracle DB在这里可能会更好,2(我注意到Databricks目前确实很慢。
无论如何,Spark按分区工作(并行(,而不是跨分区工作以获得更好的吞吐量,这就是问题所在。我不确定在MR中它会更容易,但如果是这样,那就使用它,尽管现在不太流行了。
我做了我自己的事情,这使用范围分区来处理rank/density rank,这意味着对于值的范围,相同的值只属于1个分区,所以你可以通过一些智能应用你的排名,然后通过一些整体排名,即偏移,依赖于在升序分区中具有升序的范围分区。不完全确定缓存是否都是好的,但对于少量数据需要一段时间,但我认为许多人都在室内研究,因为你知道什么情况。
此外,这也是一个很好的来源:https://www.waitingforcode.com/apache-spark-sql/range-partitioning-apache-spark-sql/read
代码
// Took some short cuts on names of fields, concentrated more on algorithm itself to avoid single partition aspect
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Window
case class X(id: Long, name: String, amount: Double)
数据:
// Gen example data via DF
val df = Seq(
(10, "order 1", 2000d), (11, "order 2", 240d), (12, "order 3", 232d), (13, "order 4", 100d), (214, "order 5", 11d), (15, "order 6", 1d),
(2141, "order 7", 390d), (17, "order 8", 30d), (18, "order 9", 99d), (19, "order 10", 55d), (20, "order 11", 129d), (21, "order 11", 75d), (15, "order 13", 1d)
).toDF("id", "name", "amount")
处理:
// Make a Dataset, makes it easier with Class in RDD conversion and back again to DF/DS
val ds = df.as[X]
// Num partitions for parallel processing so as to increase throughput, n can be anything, but you will get better throughput
// Range partitioning has all values of same value in same partition - that is the clue here
// dense and non-dense rank possibilities, not just order by row number as in your example
val n = 5
val rdd= ds.repartitionByRange(n, $"amount")
.rdd
.mapPartitionsWithIndex((index, iter) => {
iter.map(x => (index, x ))
})
val df2 = rdd.toDF().cache
df2.createOrReplaceTempView("tab1")
//Get the ranking per range partition
val res1 = spark.sql("""select *, rank() over (partition by _1 order by _2.amount asc) as RANK from tab1 """).cache
//res1.rdd.getNumPartitions // the number of partitions do not change, partitioning rangePartitioning maintained
res1.createOrReplaceTempView("tab2")
// Get max val per partition, needs caching ideally, write to disk to avoid oddities in recomputation and caching bugs or not or what not. Not always convinced it works.
spark.sql("drop table if exists MAXVALS")
spark.sql(""" create table MAXVALS as select 'dummy' as dummy, _1, max(RANK) as max_rank from tab2 GROUP BY _1 UNION SELECT 'dummy', -1, 0 """)
val resA = spark.table("MAXVALS")
// Get offsets
val resB = resA.withColumn("cum_Max_RANK", sum("max_rank").over(
Window
.partitionBy("dummy")
.orderBy(col("_1")) ))
resB.createOrReplaceTempView("tabC")
//So all the stuff works in parallel, but does it really help??? Is an RDBMS not better then???
val finalResult = spark.sql(""" select tab2._2, (tab2.RANK + tabC.cum_Max_RANK) as OVERALLRANK from tab2, tabc where tabc._1 = (tab2._1 -1) ORDER BY OVERALLRANK ASC """)
finalResult.show(false)
结果
+----------------------+-----------+
|_2 |OVERALLRANK|
+----------------------+-----------+
|[15, order 6, 1.0] |1 |
|[15, order 13, 1.0] |1 |
|[214, order 5, 11.0] |3 |
|[17, order 8, 30.0] |4 |
|[19, order 10, 55.0] |5 |
|[21, order 11, 75.0] |6 |
|[18, order 9, 99.0] |7 |
|[13, order 4, 100.0] |8 |
|[20, order 11, 129.0] |9 |
|[12, order 3, 232.0] |10 |
|[11, order 2, 240.0] |11 |
|[2141, order 7, 390.0]|12 |
|[10, order 1, 2000.0] |13 |
+----------------------+-----------+
结论
它有效。但这一切有意义吗?是的,因为完成工作总是比OOM好。
- 初始计算可以并行进行排序,但需要
repartitionByRange
- 但是,如果您想保证排序顺序(对于collect,show(,则整个final语句需要排序
- explain没有显示单个分区的情况,但必须进行大规模测试。也就是说,OOM是可以避免的,我怀疑不必要的排序仍在继续。可能是
sortWithinPartition
可能是一个热门话题,但我暂时没有考虑 - 在下面的初始代码中添加了一些逻辑,以查看是否可以说服Optimizer避免混洗,但在评估.deexplain((输出时,似乎仍会发生不必要的混洗。还有改进的空间
修改条款以尝试影响Catalyst
val finalResult = spark.sql(""" select tab2._1 as Z, tab2._2, (tab2.RANK + tabC.cum_Max_RANK) as OVERALLRANK from tab2, tabc where tabc._1 = (tab2._1 -1) ORDER BY Z ASC, OVERALLRANK ASC """)
对提问者的最终评论
此人正在寻找一种可以理解的单行代码方法。功能性很清楚需要什么,但OOM证明了这在技术上是不可能的,否则就没有OOM。需要绕过单分区方法,这就是需要使用更多并行性的方法。分区方法是Spark的基础。