SPARK df.show()函数算法



最近在接受采访时被问到spark df.show()函数的算法。火花将如何决定的遗嘱执行人/执行人将获取的记录吗?

在不破坏@thebluephantom和@Hristo Iliev的答案(每个人都对幕后发生的事情有一些见解)的情况下,我也想把我的答案添加到这个列表中。

我得出了相同的结论,尽管是通过观察底层分区的行为。

分区有一个与之关联的索引。如下面的代码所示:

(摘自spark原始源代码)

trait Partition extends Serializable {
def index: Int
:

分区之间是有顺序的

正如在其他答案中已经提到的,df.show()df.show(20)或前20行相同。因此,底层分区索引决定了这些行来自哪个分区(以及执行器)。

分区索引在读取时分配,或者在shuffle时(重新分配)。

下面是一些代码来查看这种行为:

val df = Seq((5,5), (6,6), (7,7), (8,8), (1,1), (2,2), (3,3), (4,4)).toDF("col1", "col2")
// above sequence is defined out of order - to make behaviour visible
// see partition structure
df.rdd.glom().collect()
/* Array(Array([5,5]), Array([6,6]), Array([7,7]), Array([8,8]), Array([1,1]), Array([2,2]), Array([3,3]), Array([4,4])) */
df.show(4, false)
/*
+----+----+
|col1|col2|
+----+----+
|5   |5   |
|6   |6   |
|7   |7   |
|8   |8   |
+----+----+
only showing top 4 rows
*/

在上面的代码中,我们看到8个分区(每个内部数组都是一个分区)——这是因为当我们创建一个数据框时,spark默认为8个分区。

现在让我们重新划分数据框。

// Now let's repartition df
val df2 = df.repartition(2)
// lets see the partition structure
df2.rdd.glom().collect()
/* Array(Array([5,5], [6,6], [7,7], [8,8], [1,1], [2,2], [3,3], [4,4]), Array()) */
// lets see output
df2.show(4,false)
/*
+----+----+
|col1|col2|
+----+----+
|5   |5   |
|6   |6   |
|7   |7   |
|8   |8   |
+----+----+
only showing top 4 rows
*/

在上面的代码中,前4行来自第一个分区(它实际上包含原始数据的所有元素)。还要注意分区大小的倾斜,因为没有提到分区列。

现在让我们尝试创建3个分区

val df3 = df.repartition(3)
// lets see partition structures
df3.rdd.glom().collect()
/*
Array(Array([8,8], [1,1], [2,2]), Array([5,5], [6,6]), Array([7,7], [3,3], [4,4]))
*/
// And lets see the top 4 rows this time
df3.show(4, false)
/*
+----+----+
|col1|col2|
+----+----+
|8   |8   |
|1   |1   |
|2   |2   |
|5   |5   |
+----+----+
only showing top 4 rows
*/

从上面的代码中,我们观察到Spark转到第一个分区并试图获得4行。因为只有3,它抓住了。然后移动到下一个分区,再得到一行。因此,您从show(4, false)中看到的顺序是由底层数据分区和分区之间的索引顺序决定的。

本例使用show(4),但此行为可以扩展到show()show(20)

很简单。

在Spark 2+中,show()调用showString()将数据格式化为字符串,然后打印出来。showString()调用getRows()获取数据集的顶部行作为字符串集合。getRows()调用take()获取原始行并将其转换为字符串。take()只是包装head()head()调用limit()来构建一个限制查询并执行它。limit()在逻辑计划的前面添加了一个Limit(n)节点,这实际上是一个GlobalLimit(n, LocalLimit(n))GlobalLimitLocalLimit都是OrderPreservingUnaryNode的子类,覆盖maxRows(在GlobalLimit中)或maxRowsPerPartition(在LocalLimit中)方法。逻辑计划现在看起来像:

GlobalLimit n
+- LocalLimit n
+- ...

这经过了Catalyst的分析和优化,如果树下的某些东西产生的行数少于限制并最终在执行策略中作为CollectLimitExec(m)(其中m<=n),则限制将被删除,因此物理计划看起来像:

CollectLimit m
+- ...

CollectLimitExec执行它的子计划,然后检查RDD有多少分区。如果没有,则返回空数据集。如果是,则运行mapPartitionsInternal(_.take(m))获取m的第一个元素。如果有多个,则使用mapPartitionsInternal(_.take(m))在RDD中的每个分区上应用take(m),构建一个shuffle RDD,在单个分区中收集结果,然后再次应用take(m)

换句话说,这取决于(因为优化阶段),但在一般情况下,它采用每个分区的顶部行串联的顶部行,因此涉及所有持有数据集一部分的执行器。

好吧,也许没那么简单。

一个糟糕的问题不是你会在prod中使用什么

这是一个聪明的操作,可以根据转换来查看您所拥有的内容。

Show()实际上是Show(20)。如果只是显示,它查看第一个和连续的分区以获得20行。顺序也被优化了。计数需要完整的处理。

很多google帖子。

最新更新