我的RDD为(key, (val1,val2))
。对于这个rdd
,我想应用reduceByKey
函数,我的要求是针对单个键找到最小val2
,并提取结果最小val2
的val1
。 例如:(1,(a,4)),(2,(b,3)),(1,(c,2)),(2,(d,1))
在这种情况下,我希望结果集为(1,(c,2)),(2,(d,1))
我脑海中的想法是python代码,但是在这里我得到了第一个val1
,而不是对应于最小val2
val1
。
rdd2 = rdd1.map(lambda x:(x[0],(x[1],x[3])))
rdd3 = rdd2.reduceByKey(lambda x,y:(x[0],min(x[1],y[1])))
请帮助我修改代码以获得所需的结果。
您所需要的只是在reduceByKey
函数中if else
为
rdd3 = rdd2.reduceByKey(lambda x, y: x if(x[1] < y[1]) else y)
这应该给你你想要的输出,作为
#(1, ('c', 2))
#(2, ('d', 1))
我希望答案对您有所帮助
将min
与key
参数一起使用:
from functools import partial
from operator import itemgetter
rdd.reduceByKey(partial(min, key=itemgetter(1)))
在这里,我将通过使用 scala 获得所需的结果,因为我现在已经学习了更多 scala,我现在可以回答我自己的问题,如下所示:
val list = List((1,('a',4)),(2,('b',3)),(1,('c',2)),(2,('d',1)))
val rdd = sc.parallelize(list)
rdd.reduceByKey((rec1,rec2) => if(rec1._2 > rec2._2) rec2 else rec1).foreach(println)
输出:
(2,(d,1((
(1,(c,2((
我在这里提到了这段代码,因为其他人可以参考并发现它很有用。 谢谢。。。