有效地处理RDD列表



我正在寻找一种方法来处理RDD列表,同时最大限度地减少我需要执行的混洗次数。

我有一个巨大的RDD,我已经将其分解为单独的桶,并为内部比较点进行了点菜

到目前为止,我的基本想法是:

val r:RDD[Int, T]
val buckets = List(0 to n).map(a => r.filter(key == a))
buckets.map(_.cartesian().map(//internal comparison))

但后来我迷失了方向,因为如果我按照buckets.map(_.reduce(//reduction))的思路做一些事情,会引发大量的洗牌,这让我觉得效率低下,尤其是考虑到我最终会想减少所有这些减少的结果。

有人对我如何处理这个问题有什么建议吗?

编辑:

我的总体目标是用比简单的笛卡尔算法更聪明的算法来比较RDD中的所有值。

基本思想是对值进行桶化,这样我就可以保证某些桶彼此不相似,所以我只需要比较可能彼此相似的桶。

在现实生活中,水桶会是这样的:

val partitions:List[List[Int]] = clusterSimilarBuckets()
val buckets = partitions.map(a => r.filter{case(key, v) =. a.contains(key)})

我的猜测是,您正试图围绕哈希联接的概念进行一些工作。

我的解决方案是通过一个基于bucket算法的自定义分区器进行重新分区(如果你计划减少分区数量,甚至可以更好地使用联合)。如果您计划使用同一个bucket进行多次比较,我会考虑复制bucket。考虑到这一步骤确实涉及到一些问题。

之后,您可以使用mapPartitions运行一种昂贵的算法来比较分区中的每个元素,因为前面的步骤,这些元素都来自类似的桶。

您还应该考虑根据您的用例调整Locality Sensitive Hashing,因为它使用了许多哈希器,因此两个元素很有可能属于同一个bucket。对LSH的更好解释是《挖掘海量数据集》一书的第3章。快速谷歌搜索还揭示了Spark的LSH实现。

最新更新