如何在Spark DataFrame上应用部分排序



以下代码:

val myDF = Seq(83, 90, 40, 94, 12, 70, 56, 70, 28, 91).toDF("number")
myDF.orderBy("number").limit(3).show

输出:

+------+
|number|
+------+
|    12|
|    28|
|    40|
+------+

Spark的惰性与limit调用和orderBy的实现相结合,会自动产生部分排序的DataFrame吗?或者,即使不需要,剩余的7个数字也会排序吗?如果是这样,有没有办法避免这种不必要的计算工作?


使用.explain()显示,执行两个排序阶段,首先在每个分区上,然后(每个分区的前3个(全局排序。但它并没有说明这些分类是完整的还是部分的。

myDF.orderBy("number").limit(3).explain(true)
== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3416 ASC NULLS FIRST], true
+- Project [value#3414 AS number#3416]
+- LocalRelation [value#3414]
== Analyzed Logical Plan ==
number: int
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3416 ASC NULLS FIRST], true
+- Project [value#3414 AS number#3416]
+- LocalRelation [value#3414]
== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3416 ASC NULLS FIRST], true
+- LocalRelation [number#3416]
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[number#3416 ASC NULLS FIRST], output=[number#3416])
+- LocalTableScan [number#3416]

如果explain()您的数据帧,您会发现Spark将首先执行一个"本地";在每个分区内进行排序,然后从每个分区中只选择前三个元素进行最终的全局排序,然后再从中取出前三个

scala> myDF.orderBy("number").limit(3).explain(true)
== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3 ASC NULLS FIRST], true
+- Project [value#1 AS number#3]
+- LocalRelation [value#1]
== Analyzed Logical Plan ==
number: int
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3 ASC NULLS FIRST], true
+- Project [value#1 AS number#3]
+- LocalRelation [value#1]
== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
+- Sort [number#3 ASC NULLS FIRST], true
+- LocalRelation [number#3]
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[number#3 ASC NULLS FIRST], output=[number#3])
+- LocalTableScan [number#3]

我认为它最好在优化逻辑计划部分看到,但物理上也说明了同样的事情。

  1. myDF.orderBy("number"(.limit(3(.show
  2. myDF.limit(3(.orderBy("number"(.show

1=>将进行完全排序,然后选择前3个元素。

2=>将返回带有前3个元素的数据帧并进行排序。

相关内容

  • 没有找到相关文章

最新更新