火花重新分区属于单个分区



我正在学习 spark,当我使用以下表达式在 pyspark shell 中测试 repartition() 函数时,我观察到一个非常奇怪的结果:所有元素都落入repartition()函数之后的同一分区。 在这里,我使用glom()来了解 rdd 中的分区。我期待repartition()打乱元素并将它们随机分布在分区之间。仅当我使用新数量的分区重新分区时,才会发生这种情况 <= 原始分区。

在我的测试过程中,如果我>原始分区设置新的分区数,也不会观察到洗牌。我在这里做错了什么吗?

In [1]: sc.parallelize(range(20), 8).glom().collect()
Out[1]:
[[0, 1],
[2, 3],
[4, 5],
[6, 7, 8, 9],
[10, 11],
[12, 13],
[14, 15],
[16, 17, 18, 19]]
In [2]: sc.parallelize(range(20), 8).repartition(8).glom().collect()
Out[2]:
[[],
[],
[],
[],
[],
[],
[2, 3, 6, 7, 8, 9, 14, 15, 16, 17, 18, 19, 0, 1, 12, 13, 4, 5, 10, 11],
[]]
In [3]: sc.parallelize(range(20), 8).repartition(10).glom().collect()
Out[3]:
[[],
[0, 1],
[14, 15],
[10, 11],
[],
[6, 7, 8, 9],
[2, 3],
[16, 17, 18, 19],
[12, 13],
[4, 5]]

我正在使用 Spark 版本 2.1.1。

恭喜!您刚刚重新发现了 SPARK-21782 -当 numPartitions 是 2 的幂时,重新分区会产生倾斜

目前,重新分区(启用随机的合并)的算法如下:

对于每个初始分区索引,生成位置为 (new Random(index)).nextInt(numPartitions) 然后,对于初始分区索引中的元素编号 k,将其放在新的分区位置 + k(模数分区)。

因此,基本上元素在 numPartitions 存储桶上大致相等 - 从数字位置 + 1 的存储桶开始。

请注意,将为每个初始分区索引创建一个新的 Random 实例,该实例具有固定的种子索引,然后丢弃。因此,对于世界上任何RDD的每个指数,该位置都是确定性的。此外,nextInt(bound) 实现有一个特殊情况,当绑定是 2 的幂时,它基本上是从初始种子中获取几个最高位,只有最少的加扰。

PySpark 使情况变得更糟,因为它使用默认批大小等于 10 的批处理序列化程序,因此每个分区上的项目数量较少,所有项目都会被随机排列到相同的输出。

好消息是,由于谢尔盖·谢列布里亚科夫,它已经在 Spark 2.3 中得到解决。

啊,我认为与底层分区程序有关。我尝试了更大的数字,现在结果更有意义。

In [95]: [len(lst) for lst in sc.parallelize(range(1000), 8).glom().collect()]
Out[95]: [125, 125, 125, 125, 125, 125, 125, 125]
In [96]: [len(lst) for lst in sc.parallelize(range(1000), 8).repartition(10).glom().collect()]
Out[96]: [95, 95, 100, 105, 95, 95, 100, 105, 105, 105]
In [97]: [len(lst) for lst in sc.parallelize(range(1000), 8).repartition(5).glom().collect()]
Out[97]: [190, 195, 205, 210, 200]

最新更新