Pyspark 中的 AggregateByKey 没有给出预期的输出



我有一个RDD,它有2个分区和键值对数据作为值:

rdd5.glom().collect()
[[(u'hive', 1(, (u'python', 1(, (u'spark', 1(,

(u'hive', 1(, (u'spark', 1(, (u'python', 1(], [(u'spark', 1(, (u'java', 1(, (u'java', 1(, (u'spark', 1(]]

当我表演aggregateByKey

rdd6=rdd5.aggregateByKey((0,0), lambda acc,val: (acc[0]+1,acc[1]+val), lambda acc1,acc2 : (acc1[1]+acc2[1])/acc1[0]+acc2[0])

它没有给我预期的结果:

输出:

[(u'python', (2, 2((, (u'spark', 1(, (u'java', (2, 2

((, (u'hive', (2, 2((]

预期:

[(u'python', 1(, (u'spark', 1(, (u'java', 1

(, (u'hive', 1(]

我可以看到密钥仅存在于一个分区中,没有给我预期的输出。我应该进行哪些更改才能实现这一目标?

好的,下面是使用reduceByKey和aggregateByKey执行此操作的方法。

您使用 aggregateByKey 遇到的问题是最后一个函数负责添加两个累加器。它必须返回与所有其他函数相同的结构,以便在添加另一个新累加器(从另一个分区(时它将再次工作。

它与 combineByKey 非常相似,请参阅此处。

rdd = sc.parallelize([(u'hive', 1), (u'python', 1), (u'spark', 1),
(u'hive', 1), (u'spark', 1), (u'python', 1), (u'spark', 1), (u'java', 1), (u'java', 1), (u'spark', 1)])
print rdd.aggregateByKey( (0, 0), lambda acc, val: (acc[0] + 1,acc[1] + val),
lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1])).collect()
print rdd.mapValues(lambda x: (1, x)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()
[(u'spark', (4, 4((, (u'java', (2, 2((, (u'hive', (2,

2((, (u'python', (2, 2((]

[(u'spark', (4, 4((, (u'java', (2, 2((, (u'hive', (2,2((, (u'python', (2, 2((]

如果您尝试平均值,则可以在末尾添加另一个mapValues,如下所示:

print rdd.aggregateByKey( (0, 0),
lambda acc, val: (acc[0] + 1,acc[1] + val),
lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1]))
.mapValues(lambda x: x[1] * 1.0 / x[0])
.collect()
[(u'spark', 1.0(, (u'java', 1.0(, (u'hive', 1.0

(, (u'python', 1.0(]

最新更新