我正在尝试连接两个数据集。一种类型(Id,salesRecord),另一种类型(Id,Name)。第一个数据集由 HashPartitioner 分区,第二个数据集由自定义分区程序分区。当我通过id加入这些RDD并尝试查看保留了哪些分区程序信息时,我随机看到有时joinRDD显示自定义分区程序,有时显示哈希分区程序。 我在更改分区数的同时收到了不同的分区结果。
根据 Learning Spark 书籍,rdd1.join(rdd2) 保留了 rdd1 中的分区信息。
这是代码。
val hashPartitionedRDD = cusotmerIDSalesRecord.partitionBy(new HashPartitioner(10))
println("hashPartitionedRDD's partitioner " + hashPartitionedRDD.partitioner) // Seeing Instance of HashParitioner
val customPartitionedRDD = customerIdNamePair1.partitionBy(new CustomerPartitioner)
println("customPartitionedRDD partitioner " + customPartitionedRDD.partitioner) // Seeing instance of CustomPartitioner
val expectedHash = hashPartitionedRDD.join(customPartitionedRDD)
val expectedCustom = customPartitionedRDD.join(hashPartitionedRDD)
println("Expected Hash " + expectedHash.partitioner) // Seeing instance of Custom Partitioner
println("Expected Custom " + expectedCustom.partitioner) //Seeing instance of Custom Partitioner
// Just to add more to it when number of partitions of both the data sets I made equal I am seeing the reverse results. i.e.
// expectedHash shows CustomPartitioner and
// expectedCustom shows Hashpartitioner Instance.
join
方法在内部调用,Partitioner.defaultPartitioner()
方法。
根据defaultPartitioner
的定义:
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}
}
如果你仔细观察:
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
它根据分区数(按降序)启动for-loop
(或搜索)。因此,如果两个分区RDDs
都有自己的分区程序,则将选择分区数较多的分区程序。
编辑:您提出的关于查看reverse
行为的问题非常简单。在这里,如果两者具有相同数量的分区,则others
将位于Seq
的顶部。因此,将选择参数RDD
的分区程序。
(Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
这种行为是可以解释的,但可能并不直观。