我可以在 sparkConf 中为不同的 sql 设置不同的 autoBroadcastJoinThreshold 值吗



>我有大型数据帧:A(200g),B(20m),C(15m),D(10m),E(12m),我想将它们连接在一起:A连接B,C在同一个SparkSession中使用spark sql连接D和E**。就像:

absql:sql("select * from A a inner join B b on a.id=b.id").write.csv("/path/for/ab")
cdesql:sql("select * from C c inner join D d on c.id=d.id inner join E e on c.id=e.id").write.csv("/path/for/cde")

问题:

当我使用默认spark.sql.autoBroadcastJoinThreshold=10m

  • ABSQL 会花费很长时间,原因是 ABSQL 偏斜。
  • CDESQL 是正常的

当我设置spark.sql.autoBroadcastJoinThreshold=20m

  • C,D,E将被广播,所有任务将在同一执行器中执行,仍然需要很长时间。
  • 如果设置了 num-executors=200,则需要很长时间才能广播
  • ABSQL 是正常的

无需更改autoBroadcastJoinThreshold,您可以标记要广播的数据帧。通过这种方式,可以轻松决定应广播或不广播哪些数据帧。

在 Scala 中,它看起来像这样:

import org.apache.spark.sql.functions.broadcast
val B2 = broadcast(B)
B2.createOrReplaceTempView("B")

此处,数据帧 B 已标记为广播,然后注册为要与 Spark SQL 一起使用的表。


或者,这可以直接使用数据帧 API 完成,第一个连接可以写成:

A.join(broadcast(B), Seq("id"), "inner")

相关内容

  • 没有找到相关文章

最新更新