星火广播vs加盟



我有一个大的RDD(RDD_1)和它的一个子集(RDD_2)。我想在不同的字段上连接RDD_1和RDD_2。

假设记录的格式为{"first_name":<>,"last_name":lt;>}。我们想找到所有与所有"杰克"姓氏相同的名字。

names = sc.textfile(RAW_DATA)
jack = names.filter(lambda v: v['first_name'] == 'jack')

选项1

jack_last_names = jack.map(operator.itergetter('last_name').distinct().collect()
last_names_bc = sc.broadcast(set(jack_last_names))
final = names.filter(lambda v:v['last_name'] in last_names_bc.value)

目前,我广播rdd_2并用它过滤rdd_1。问题是,为了广播rdd_ 2,我必须首先在驱动程序上收集()它,这会导致驱动程序内存不足。

有没有一种方法可以在不首先在驱动程序上收集()的情况下广播RDD?

选项2

final = jack.keyBy(operator.itemgetter('last_name').join(names.keyBy(operator.itemgetter('last_name')

我的另一个选择是rdd_1.join(rdd_2),但rdd_1太大了,无法洗牌。

当我们运行rdd_1.join(rdd_2)时,rdd_1和rdd_2都会进行哈希分区和混洗吗?

谢谢!

有没有一种方法可以在不首先在驱动程序上收集()的情况下广播RDD?

不,没有,即使有也不能解决你的问题。

  • 无法执行嵌套操作或转换
  • 如果您可以创建一个没有集合的本地广播变量,那么您将面临同样的问题,但在worker上

当我们运行rdd_1.join(rdd_2)时,rdd_1和rdd_2都会进行哈希分区和混洗吗?

从技术上讲,在PySpark中,它需要uniongroupByKey,因此这意味着所有数据都必须进行混洗。

在实践中,我只会接受洗牌的代价。一般来说,写任何复杂的应用程序都不可能完全避免洗牌。此外,它并不比广播类似数量的数据,甚至通过复制将数据复制到分布式文件系统更昂贵。

最新更新