>我有大型数据帧:A(200g),B(20m),C(15m),D(10m),E(12m),我想将它们连接在一起:A连接B,C在同一个SparkSession中使用spark sql连接D和E**。就像:
absql:sql("select * from A a inner join B b on a.id=b.id").write.csv("/path/for/ab")
cdesql:sql("select * from C c inner join D d on c.id=d.id inner join E e on c.id=e.id").write.csv("/path/for/cde")
问题:
当我使用默认spark.sql.autoBroadcastJoinThreshold=10m
时
- ABSQL 会花费很长时间,原因是 ABSQL 偏斜。
- CDESQL 是正常的
当我设置spark.sql.autoBroadcastJoinThreshold=20m
- C,D,E将被广播,所有任务将在同一执行器中执行,仍然需要很长时间。
- 如果设置了 num-executors=200,则需要很长时间才能广播
- ABSQL 是正常的
无需更改autoBroadcastJoinThreshold
,您可以标记要广播的数据帧。通过这种方式,可以轻松决定应广播或不广播哪些数据帧。
在 Scala 中,它看起来像这样:
import org.apache.spark.sql.functions.broadcast
val B2 = broadcast(B)
B2.createOrReplaceTempView("B")
此处,数据帧 B 已标记为广播,然后注册为要与 Spark SQL 一起使用的表。
或者,这可以直接使用数据帧 API 完成,第一个连接可以写成:
A.join(broadcast(B), Seq("id"), "inner")