最近在接受采访时被问到spark df.show()函数的算法。火花将如何决定的遗嘱执行人/执行人将获取的记录吗?
在不破坏@thebluephantom和@Hristo Iliev的答案(每个人都对幕后发生的事情有一些见解)的情况下,我也想把我的答案添加到这个列表中。
我得出了相同的结论,尽管是通过观察底层分区的行为。
分区有一个与之关联的索引。如下面的代码所示:
(摘自spark原始源代码)
trait Partition extends Serializable {
def index: Int
:
分区之间是有顺序的
正如在其他答案中已经提到的,df.show()
与df.show(20)
或前20行相同。因此,底层分区索引决定了这些行来自哪个分区(以及执行器)。
分区索引在读取时分配,或者在shuffle时(重新分配)。
下面是一些代码来查看这种行为:
val df = Seq((5,5), (6,6), (7,7), (8,8), (1,1), (2,2), (3,3), (4,4)).toDF("col1", "col2")
// above sequence is defined out of order - to make behaviour visible
// see partition structure
df.rdd.glom().collect()
/* Array(Array([5,5]), Array([6,6]), Array([7,7]), Array([8,8]), Array([1,1]), Array([2,2]), Array([3,3]), Array([4,4])) */
df.show(4, false)
/*
+----+----+
|col1|col2|
+----+----+
|5 |5 |
|6 |6 |
|7 |7 |
|8 |8 |
+----+----+
only showing top 4 rows
*/
在上面的代码中,我们看到8个分区(每个内部数组都是一个分区)——这是因为当我们创建一个数据框时,spark默认为8个分区。
现在让我们重新划分数据框。
// Now let's repartition df
val df2 = df.repartition(2)
// lets see the partition structure
df2.rdd.glom().collect()
/* Array(Array([5,5], [6,6], [7,7], [8,8], [1,1], [2,2], [3,3], [4,4]), Array()) */
// lets see output
df2.show(4,false)
/*
+----+----+
|col1|col2|
+----+----+
|5 |5 |
|6 |6 |
|7 |7 |
|8 |8 |
+----+----+
only showing top 4 rows
*/
在上面的代码中,前4行来自第一个分区(它实际上包含原始数据的所有元素)。还要注意分区大小的倾斜,因为没有提到分区列。
现在让我们尝试创建3个分区
val df3 = df.repartition(3)
// lets see partition structures
df3.rdd.glom().collect()
/*
Array(Array([8,8], [1,1], [2,2]), Array([5,5], [6,6]), Array([7,7], [3,3], [4,4]))
*/
// And lets see the top 4 rows this time
df3.show(4, false)
/*
+----+----+
|col1|col2|
+----+----+
|8 |8 |
|1 |1 |
|2 |2 |
|5 |5 |
+----+----+
only showing top 4 rows
*/
从上面的代码中,我们观察到Spark转到第一个分区并试图获得4行。因为只有3,它抓住了。然后移动到下一个分区,再得到一行。因此,您从show(4, false)
中看到的顺序是由底层数据分区和分区之间的索引顺序决定的。
本例使用show(4)
,但此行为可以扩展到show()
或show(20)
。
很简单。
在Spark 2+中,show()
调用showString()
将数据格式化为字符串,然后打印出来。showString()
调用getRows()
获取数据集的顶部行作为字符串集合。getRows()
调用take()
获取原始行并将其转换为字符串。take()
只是包装head()
。head()
调用limit()
来构建一个限制查询并执行它。limit()
在逻辑计划的前面添加了一个Limit(n)
节点,这实际上是一个GlobalLimit(n, LocalLimit(n))
。GlobalLimit
和LocalLimit
都是OrderPreservingUnaryNode
的子类,覆盖maxRows
(在GlobalLimit
中)或maxRowsPerPartition
(在LocalLimit
中)方法。逻辑计划现在看起来像:
GlobalLimit n
+- LocalLimit n
+- ...
这经过了Catalyst的分析和优化,如果树下的某些东西产生的行数少于限制并最终在执行策略中作为CollectLimitExec(m)
(其中m
<=n
),则限制将被删除,因此物理计划看起来像:
CollectLimit m
+- ...
CollectLimitExec
执行它的子计划,然后检查RDD有多少分区。如果没有,则返回空数据集。如果是,则运行mapPartitionsInternal(_.take(m))
获取m
的第一个元素。如果有多个,则使用mapPartitionsInternal(_.take(m))
在RDD中的每个分区上应用take(m)
,构建一个shuffle RDD,在单个分区中收集结果,然后再次应用take(m)
。
换句话说,这取决于(因为优化阶段),但在一般情况下,它采用每个分区的顶部行串联的顶部行,因此涉及所有持有数据集一部分的执行器。
好吧,也许没那么简单。
一个糟糕的问题不是你会在prod中使用什么
这是一个聪明的操作,可以根据转换来查看您所拥有的内容。
Show()实际上是Show(20)。如果只是显示,它查看第一个和连续的分区以获得20行。顺序也被优化了。计数需要完整的处理。
很多google帖子。