当我们使用带有提示的broadcast Join时,Spark如何广播数据-正如我在使用广播提示时所看到的:它调用这个函数
def broadcast[T](df: Dataset[T]): Dataset[T] = {
Dataset[T](df.sparkSession,
ResolvedHint(df.logicalPlan, HintInfo(strategy = Some(BROADCAST))))(df.exprEnc)
}
内部调用dataset&使用ResolvedHint 设置逻辑计划
val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
但这之后会发生什么呢。这实际上是如何工作的,为此编写的代码在哪里。
- 如果我们有多个小数据集分区(我们将要广播(,spark会合并所有分区吗;然后广播
- 它是否先向驾驶员广播&然后交给执行人
- 什么是BitTorrent
关于1&2在广播期间,在驱动程序上收集加入数据,稍后会发生什么取决于加入算法
对于BroadcastHashJoin(BHJ(驱动程序构建哈希表,然后将该表分发给执行器
对于BroadcastNestedLoops广播的数据集作为数组分发给执行器
所以,正如你所看到的,初始结构并没有保存在这里,整个广播的数据集需要放入驱动程序的内存中(否则工作将失败,驱动程序上会出现oom错误(
关于3,你到底想知道什么?
在spark中有TorrentBroadcast,它是类似BitTorrent的广播实现。我对此了解不多(我从未深入挖掘过(,但如果你想了解更多,我认为你可以从这里开始:
Torrent广播文档
Torrent广播源代码
HttpBroadcast-docu-它的另一个广播算法