合并排序联接在Spark中是如何工作的?为什么它可以抛出OOM



我想深入了解Spark中合并排序联接的概念。我理解总体思路:这与合并排序算法中的方法相同:取2个排序的数据集,比较第一行,写最小的一行,重复。我也了解如何实现分布式合并排序

但我不知道它是如何在Spark中实现的,关于分区和执行器的概念。

这是我的看法

  1. 假设我需要连接两个表A和B。如果这很重要的话,表是通过Spark SQL从Hive读取的
  2. 默认情况下,Spark使用200个分区
  3. Spark将计算联接键范围(从minKey(A,B(到maxKey(A、B()并将其分成200个部分。要按关键字拆分的两个数据集分为200个部分:A分区和B分区
  4. 与同一密钥相关的每个A分区和每个B分区被发送到同一执行器,并且在那里彼此分离
  5. 现在,200个执行器可以将200个A分区与200个B分区连接起来并保证它们共享相同的密钥范围
  6. 通过合并排序算法进行连接:从中获取最小密钥A分区,与B分区中的最小密钥进行比较,写入匹配或迭代
  7. 最后,我有200个连接的数据分区

这有意义吗?

问题:键倾斜。如果某个键范围包含50%的数据集键,则某些执行器将受到影响,因为太多的行将进入同一分区。它甚至可能在OOM中失败,同时试图在内存中对太大的A分区或B分区进行排序(我不明白为什么Spark不能像Hadoop那样用磁盘溢出进行排序?…(或者它可能失败是因为它试图将两个分区都读取到内存中进行连接?

所以,这是我的猜测。你能纠正我并帮助我理解Spark的工作方式吗?

这是MPP数据库上联接的常见问题,Spark也不例外。正如您所说,要执行联接,同一联接键值的所有数据都必须位于同一位置,因此,如果联接键上存在偏斜分布,则数据的分布就会偏斜,并且一个节点会过载。

如果连接的一侧很小,则可以使用映射侧连接。Spark查询规划器确实应该为您做这件事,但它是可调的——不确定它有多新,但它看起来很有用。

您是否在两个表上都运行了ANALYZE TABLE?

如果两边都有一个不会破坏联接语义的键,那么可以将其包含在联接中。

为什么Spark不能像Hadoop那样使用磁盘溢出进行排序?

Spark合并排序联接确实溢出到磁盘。看看Spark SortMergeJoinExec类,它使用ExternalAppendOnlyUnsafeRowArray,它被描述为:

UnsafeRows的一个仅追加数组,它严格地将内容保留在内存中的数组中,直到达到numRowsInMemoryBufferThreshold之后,它将切换到一种模式,在达到numRowsSpillThreshold后(或者如果内存消耗过多,则在达到之前(刷新到磁盘

这与在Web UI的联接操作中看到任务溢出到磁盘的体验一致。

为什么[merge-sort-join]可以抛出OOM?

来自Spark内存管理概述:

Spark的shuffle操作(sortByKey、groupByKey、reduceByKey、join等(在每个任务中构建一个哈希表来执行分组,该哈希表通常很大。这里最简单的解决方案是提高并行度,使每个任务的输入集更小。

即,在join的情况下,增加spark.sql.shuffle.partitions以减小分区和生成的哈希表的大小,并相应地降低OOM的风险。

最新更新