apache spark - Pyspark过滤前三个匹配时,执行余弦相似度



我有两个文档集合。我已经计算了每对笛卡尔积之间的余弦相似度,并得到了形式为

的RDD
(k1,(k2,c))

其中k1是第一个集合中的文档,k2是第二个集合中的文档,c是它们之间的余弦相似度。

我感兴趣的是,对于第一个集合中的每个文档k1,获取第二个集合中最相似的三个文档。我已按键执行分组:

grouped = (pairRddWithCosine
         .groupByKey()
         .map(lambda (k, v): (k, sorted(v, key=lambda x: -x[1])))
         .map(lambda (x,y): (x, y[0][0],y[0][1], y[1][0], y[1][1],         y[2][0] , y[2][1]))
      )

结果是这个组的表现很差。你能告诉我如何调整它,或者更好的是,使用一些不打乱数据的东西吗?

如果您想获得键值的总和/计数/部分,您应该避免groupByKey,因为groupByKey会打乱所有数据,以便给定键的所有值最终在相同的reducer中结束。对于大型数据集,这是非常昂贵的。相反,您应该使用reduceByKey或combineByKey。对于这些操作,您可以指定在每个分区上累积数据的函数,以及来自不同分区的累加器之间的合并函数。您可以阅读此链接获取更多详细信息:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

我想你应该试试reduceByKey,因为你只对部分值感兴趣

k_with_top_c = rdd.reduceByKey(lambda v: sorted(v, key=lambda x: -x[1])[:3])

reduceByKey将首先尝试本地减少,因此它比groupByKey运行得快。但是,我认为在这种情况下您无法避免shuffle。

或者,我认为如果我们取

smallRdd = pairRddWithCosine.map(lambda (k1,(k2,c)))
然后

Combined = (smallRdd
        .combineByKey(lambda value: [value],
                      lambda x, value: x + [value],
                      lambda x, y : max(x,y))
        .map(lambda (x,y): (x,y[0]))
        .map(lambda x: (x,0))
       )

后跟一个连接将提供第一个匹配。我们可以通过执行leftOuterJoin()来获得pairrddwithcos中所有不是最佳匹配的元素用最优匹配得到次优

最新更新