我想在我的combineByKey
/reduceByKey
/foldByKey
中具有依赖于当前正在操作的键的逻辑。从方法签名中我可以看出,传递给这些方法的唯一参数是正在组合/简化/折叠的值。
使用一个简单的例子,我只是有一个RDD是(int, int)
元组,我想要的结果是一个由tuple[0]
键的RDD,其中值是最接近键的int
。
(1, 8)
(1, 3)
(1, -1)
(2, 4)
(2, 5)
(2, 2)
(3, 2)
(3, 4)
应缩减为:
(1, 3)
(2, 2)
(3, 2)
注意,在比较(1, 3)
和(1, -1)
时,我不关心选择哪一个,因为它们都是相同的距离。"3"键也一样。
我想象这样做的方式是:
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2)
但是reduce
函数只有2个参数:两个要组合的值。似乎最简单的方法是在我的减速器中引用键来实现我的目标;这可能吗?
如果我尝试这个,我得到一个错误:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect()
TypeError:()接受3个参数(给定2个)
我不是真的在寻找这个例子问题的解决方案。我想知道的是,如果有一个原因的关键是不传递给reduceByKey
函数?我想这是我缺少的一些基本的map-reduce原理。
注意,我可以通过插入一个映射步骤来解决我的例子,该步骤将每个值映射到一个由值和到键的距离组成的元组:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])])))
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap()
我认为没有理由不传递密钥。
然而,我觉得reduceByKey
API是为常见用例设计的——计算每个键的值和。到目前为止,我从未在值计算中使用过键。但那只是我的看法。
你解决的问题似乎是一个简单的聚合问题。min()
和groupByKey
可以找到答案。我知道你不是在寻找一个解决方案,但这是我的写作方式。
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
reduced = rdd.groupByKey().map(lambda (k, v): (k, min(v, key=lambda e:abs(e-k))))
print(reduced.collectAsMap())
结果{1: 3, 2: 2, 3: 2}