如何在pyspark中压缩多个RDD



在spark中,有zipPartitions将多个RDD合并为一个。然而,pyspark RDD没有这样的方法。如果我多次使用zip,那么我会为组合的每个rdd创建一个新的数据帧,这不是我想要的。

如何在pyspark中将多个RDD压缩为一个?

好问题。在PySpark中引入zipPartitions是在2016年提出的,但正如您在评论中看到的那样,他们从未在性能和解决方案复杂性之间找到良好的折衷方案。这个问题现在已经结束,但我认为在不久的将来不会重新讨论。这是Joseph E.Gonzalez提出的解决方案。


使用API最快的方法是自己编写(当然性能不会那么好(。一个非常天真的zipPartitions实现是:

def zipPartitions(rdd1, rdd2, func):
rdd1_numPartitions = rdd1.getNumPartitions()
rdd2_numPartitions = rdd2.getNumPartitions()
assert rdd1_numPartitions == rdd2_numPartitions, "rdd1 and rdd2 must have the same number of partitions"

paired_rdd1 = rdd1.mapPartitionsWithIndex(lambda index, it: ((index, list(it)),))
paired_rdd2 = rdd2.mapPartitionsWithIndex(lambda index, it: ((index, list(it)),))

zipped_rdds = paired_rdd1.join(paired_rdd2, numPartitions=rdd1_numPartitions)
.flatMap(lambda x: func(x[1][0], x[1][1]))

return zipped_rdds

您可以使用进行测试

rdd1 = sc.parallelize(range(30), 3)
rdd2 = sc.parallelize(range(50), 3)
zipPartitions(rdd1, rdd2, lambda it1, it2: itertools.zip_longest(it1, it2))
.glom().collect()

参数很容易理解,按顺序,它们是第一个rdd、第二个rdd和一个接受2个分区迭代器的函数,每个rdd一个。使用assert rdd1_numPartitions == rdd2_numPartitions,我确保两个rdd都有相同数量的分区,这也是Scala版本的先决条件。然后,我在两个rdd上使用mapPartitionsWithIndex来转换,例如,一个有两个分区的rdd,从:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

[(0, [0, 1, 2, 3, 4]), (1, [5, 6, 7, 8, 9])]

注意:不幸的是,从itlist(it)的转换是必要的,因为在大多数python实现中,您不能pickle生成器,而it参数是生成器。有一个例外允许您将it转换为列表,pyspark通过非常巧妙的优化处理了这种情况,我说的是从range()创建的rdd。事实上,考虑到前面的例子,

range(10)

成为

[(0, range(0, 5)), (1, range(5, 10))]

接下来我可以join分区索引上的两个新rdd。numPartitions可以很容易地预测,因为我们之前已经断言,两个rdd必须具有相同数量的分区,因此它们处于1对1的关系中。最后,我可以应用传递的函数并使分区结果列表变平。

最新更新