如何分组bykey一个RDD,与DenseVector为关键,在Spark



我创建了一个RDD,每个成员都是键值对,键是DenseVector,值是int。例如

[(DenseVector([3,4]),10),  (DenseVector([3,4]),20)]

现在我要按键k1: DenseVector([3,4])分组。我希望行为是分组关键k1的所有值,即1020。但是结果是

[(DenseVector([3,4]), 10), (DenseVector([3,4]), 20)] 

代替

[(DenseVector([3,4]), [10,20])]

请让我知道,如果我错过了什么。

相同的代码是:

#simplified version of code
#rdd1 is an rdd containing [(DenseVector([3,4]),10),  (DenseVector([3,4]),20)]
rdd1.groupByKey().map(lambda x : (x[0], list(x[1])))
print(rdd1.collect())

嗯,这是一个棘手的问题,简短的回答是你不能。要理解其中的原因,您必须深入研究DenseVector的实现。DenseVector只是NumPy float64 ndarray的包装器

>>> dv1 = DenseVector([3.0, 4.0])
>>> type(dv1.array)
<type 'numpy.ndarray'>
>>> dv1.array.dtype
dtype('float64')

由于NumPy ndarrays,不像DenseVector是可变的,不能以一种有意义的方式散列,尽管提供__hash__方法是有趣的。有一个有趣的问题涉及到这个问题(参见:numpy narray哈希可达性)。

>>> dv1.array.__hash__() is None
False
>>> hash(dv1.array)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'numpy.ndarray'

DenseVector继承了object__hash__方法,它只是基于id(给定实例的内存地址):

>>> id(d1) / 16 == hash(d1)
True

不幸的是,这意味着具有相同内容的两个DenseVectors具有不同的哈希值:

>>> dv2 = DenseVector([3.0, 4.0])
>>> hash(dv1) == hash(dv2)
False

你能做什么?最简单的方法是使用提供一致的hash实现的不可变数据结构,例如tuple:

rdd.groupBy(lambda (k, v): tuple(k))

注意:在实践中,使用数组作为键很可能是一个坏主意。对于大量的元素,散列处理可能会非常昂贵而无用。不过,如果您确实需要这样的东西,Scala似乎可以很好地工作:

import org.apache.spark.mllib.linalg.Vectors
val rdd = sc.parallelize(
    (Vectors.dense(3, 4), 10) :: (Vectors.dense(3, 4), 20) :: Nil)
rdd.groupByKey.collect

最新更新