在 Spark 中均匀分区 RDD



我在HDFS中有一个文本文件,大约有1000万条记录。我正在尝试读取文件,对该数据进行一些转换。我正在尝试在对数据进行处理之前对数据进行统一分区。这是示例代码

var myRDD = sc.textFile("input file location")
myRDD = myRDD.repartition(10000)

当我对这些重新分区的数据进行转换时,我看到一个分区具有异常大量的记录,而其他分区的数据非常少。(分布图(

因此,只有一个执行程序的负载很高 我也尝试了并得到了相同的结果

myRDD.coalesce(10000, shuffle = true)

有没有办法在分区之间均匀分布记录。

附上该特定执行程序的随机读取大小/记录数 带圆圈的记录比其他记录要处理的记录多得多

任何帮助,谢谢。

为了处理偏差,您可以使用 distributionby(或按使用方式使用重新分区(对数据进行重新分区。对于要作为分区依据的表达式,请选择您知道将均匀分布数据的内容。

您甚至可以使用DataFrame(RDD(的主键。

即使这种方法也不能保证数据在分区之间均匀分布。这完全取决于我们分发的表达式的哈希值。 火花:如何在所有分区中均匀分配我的记录

可以使用加盐,包括添加新的"假"密钥并与当前密钥一起使用以更好地分发数据。 (这是腌制的链接(

对于小数据,我发现我需要自己强制实施统一分区。在 pyspark 中,差异很容易重现。在这个简单的示例中,我只是尝试将 100 个元素的列表并行化为 10 个偶数分区。我希望每个分区可容纳 10 个元素。相反,我得到的分布不均匀,分区大小从 4 到 22 不等:

my_list = list(range(100))
rdd = spark.sparkContext.parallelize(my_list).repartition(10)
rdd.glom().map(len).collect()
# Outputs: [10, 4, 14, 6, 22, 6, 8, 10, 4, 16]

这是我使用的解决方法,即自己索引数据,然后修改索引以查找要将行放入哪个分区:

my_list = list(range(100))
number_of_partitions = 10
rdd = (
spark.sparkContext
.parallelize(zip(range(len(my_list)), my_list))
.partitionBy(number_of_partitions, lambda idx: idx % number_of_partitions)
)
rdd.glom().map(len).collect()
# Outputs: [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]

最新更新