Spark 的随机排序合并联接。一个数据帧已存储。Spark会利用这一点吗?



我记得在使用RDD时,如果一个键值RDD(rdd1(具有已知分区,那么使用另一个未分区的键值RDD执行连接(rdd2(将带来性能优势。这是因为1(只有rdd2的数据需要在网络上传输,2(通过将rdd1的密钥划分为rdd2 的密钥,rdd2的每个元素只需要传输到一个节点,而不是所有节点

我正在学习使用DataFrames进行无序排序合并联接。我正在读的书(Learning Spark,第二版(中的例子是基于user_id列连接两个DataFrames。该示例试图演示从联接操作中消除Exchange阶段,因此,在联接之前,两个数据帧都被要联接的列分段到相同数量的存储桶中。

我的问题是,如果只有一个DataFrames被装箱,会发生什么?显然,Exchange阶段将再次出现。但是,如果我们知道DataFrame1被我们想要加入的列分为N个桶,Spark会像RDD的情况一样,使用这个桶信息在网络上有效地传输DataFrame2的行吗?Spark会将DataFrame1的行保留在原来的位置,并对DataFrame2应用相同的bucketing吗?(假设N个bucket在执行器连接的分区中产生了合理数量的数据(或者,Spark是否低效地对两个DataFrames进行了混洗?

特别是,我可以想象这样一种情况,即我有一个"主"DataFrame,我需要针对它与同一列上的其他补充DataFrame执行许多独立联接。当然,只需要预存储主DataFrame,就可以看到所有联接的性能优势吗?(尽管我认为不厌其烦地收集补充数据帧也不会有什么坏处(

https://kb.databricks.com/data/bucketing.html这解释了这一切,在我总结的原始帖子上做了一些修饰。

底线:

val t1 = spark.table("unbucketed")
val t2 = spark.table("bucketed")
val t3 = spark.table("bucketed")

未装箱-装箱联接。双方都需要重新划分。

t1.join(t2, Seq("key")).explain()

使用重分区取消块连接。未扣边为正确地重新分区,并且只需要一次洗牌。

t1.repartition(16, $"key").join(t2, Seq("key")).explain()

使用不正确的重新分区取消装箱(默认值(200(-装箱连接。未扣球的一侧被错误地重新分配,两次洗牌需要。

t1.repartition($"key").join(t2, Seq("key")).explain()

带桶-带桶连接。理想情况下,双方都相同bucking和不需要洗牌

t3.join(t2, Seq("key")).explain()

因此,为了获得最佳性能,双方都需要相同的桶形。

最新更新