我有一个RDD,它有2个分区和键值对数据作为值:
rdd5.glom().collect()
[[(u'hive', 1(, (u'python', 1(, (u'spark', 1(,(u'hive', 1(, (u'spark', 1(, (u'python', 1(], [(u'spark', 1(, (u'java', 1(, (u'java', 1(, (u'spark', 1(]]
当我表演aggregateByKey
rdd6=rdd5.aggregateByKey((0,0), lambda acc,val: (acc[0]+1,acc[1]+val), lambda acc1,acc2 : (acc1[1]+acc2[1])/acc1[0]+acc2[0])
它没有给我预期的结果:
输出:
[(u'python', (2, 2((, (u'spark', 1(, (u'java', (2, 2((, (u'hive', (2, 2((]
预期:
[(u'python', 1(, (u'spark', 1(, (u'java', 1(, (u'hive', 1(]
我可以看到密钥仅存在于一个分区中,没有给我预期的输出。我应该进行哪些更改才能实现这一目标?
好的,下面是使用reduceByKey和aggregateByKey执行此操作的方法。
您使用 aggregateByKey 遇到的问题是最后一个函数负责添加两个累加器。它必须返回与所有其他函数相同的结构,以便在添加另一个新累加器(从另一个分区(时它将再次工作。
它与 combineByKey 非常相似,请参阅此处。
rdd = sc.parallelize([(u'hive', 1), (u'python', 1), (u'spark', 1),
(u'hive', 1), (u'spark', 1), (u'python', 1), (u'spark', 1), (u'java', 1), (u'java', 1), (u'spark', 1)])
print rdd.aggregateByKey( (0, 0), lambda acc, val: (acc[0] + 1,acc[1] + val),
lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1])).collect()
print rdd.mapValues(lambda x: (1, x)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()
[(u'spark', (4, 4((, (u'java', (2, 2((, (u'hive', (2,2((, (u'python', (2, 2((]
[(u'spark', (4, 4((, (u'java', (2, 2((, (u'hive', (2,2((, (u'python', (2, 2((]
如果您尝试平均值,则可以在末尾添加另一个mapValues,如下所示:
print rdd.aggregateByKey( (0, 0),
lambda acc, val: (acc[0] + 1,acc[1] + val),
lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1]))
.mapValues(lambda x: x[1] * 1.0 / x[0])
.collect()
[(u'spark', 1.0(, (u'java', 1.0(, (u'hive', 1.0(, (u'python', 1.0(]