为什么我不能在归约逻辑中引用键?



我想在我的combineByKey/reduceByKey/foldByKey中具有依赖于当前正在操作的键的逻辑。从方法签名中我可以看出,传递给这些方法的唯一参数是正在组合/简化/折叠的值。

使用一个简单的例子,我只是有一个RDD是(int, int)元组,我想要的结果是一个由tuple[0]键的RDD,其中值是最接近键的int

例如:

(1, 8)
(1, 3)
(1, -1)
(2, 4)
(2, 5)
(2, 2)
(3, 2)
(3, 4)

应缩减为:

(1, 3)
(2, 2)
(3, 2)

注意,在比较(1, 3)(1, -1)时,我不关心选择哪一个,因为它们都是相同的距离。"3"键也一样。

我想象这样做的方式是:

rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2)

但是reduce函数只有2个参数:两个要组合的值。似乎最简单的方法是在我的减速器中引用键来实现我的目标;这可能吗?

如果我尝试这个,我得到一个错误:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect()

TypeError:()接受3个参数(给定2个)

我不是真的在寻找这个例子问题的解决方案。我想知道的是,如果有一个原因的关键是不传递给reduceByKey函数?我想这是我缺少的一些基本的map-reduce原理。


注意,我可以通过插入一个映射步骤来解决我的例子,该步骤将每个值映射到一个由值和到键的距离组成的元组:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])])))
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap()

我认为没有理由不传递密钥。
然而,我觉得reduceByKey API是为常见用例设计的——计算每个键的值和。到目前为止,我从未在值计算中使用过键。但那只是我的看法。

你解决的问题似乎是一个简单的聚合问题。min()groupByKey可以找到答案。我知道你不是在寻找一个解决方案,但这是我的写作方式。

from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
reduced = rdd.groupByKey().map(lambda (k, v): (k, min(v, key=lambda e:abs(e-k))))
print(reduced.collectAsMap())
结果

{1: 3, 2: 2, 3: 2}

相关内容

  • 没有找到相关文章

最新更新