reduceByKey in spark python with tuple values



我的RDD为(key, (val1,val2))。对于这个rdd,我想应用reduceByKey函数,我的要求是针对单个键找到最小val2,并提取结果最小val2val1。 例如:(1,(a,4)),(2,(b,3)),(1,(c,2)),(2,(d,1))在这种情况下,我希望结果集为(1,(c,2)),(2,(d,1))

我脑海中的想法是python代码,但是在这里我得到了第一个val1,而不是对应于最小val2val1

rdd2 = rdd1.map(lambda x:(x[0],(x[1],x[3])))
rdd3 = rdd2.reduceByKey(lambda x,y:(x[0],min(x[1],y[1])))

请帮助我修改代码以获得所需的结果。

您所需要的只是在reduceByKey函数中if else

rdd3 = rdd2.reduceByKey(lambda x, y: x if(x[1] < y[1]) else y)

这应该给你你想要的输出,作为

#(1, ('c', 2))
#(2, ('d', 1))

我希望答案对您有所帮助

minkey参数一起使用:

from functools import partial
from operator import itemgetter
rdd.reduceByKey(partial(min, key=itemgetter(1)))

在这里,我将通过使用 scala 获得所需的结果,因为我现在已经学习了更多 scala,我现在可以回答我自己的问题,如下所示:

val list = List((1,('a',4)),(2,('b',3)),(1,('c',2)),(2,('d',1)))
val rdd = sc.parallelize(list)
rdd.reduceByKey((rec1,rec2) => if(rec1._2 > rec2._2) rec2 else rec1).foreach(println)

输出:

(2,(d,1((

(1,(c,2((

我在这里提到了这段代码,因为其他人可以参考并发现它很有用。 谢谢。。。

最新更新