AggregateBykey using python sets



我遇到了以下解释 aggregateByKey 的 scala 示例。斯卡拉示例:

val pairs=sc.parallelize(Array(("a",3),("a",1),("b",7),("a",5)))
import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect

上述 scala 代码的输出为:

res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))

我用python重写了上面的scala代码:

pair = sc.parallelize([("a",3),("a",1),("b",7),("a",5)])
sets=pair.aggregateByKey((set()),(lambda x,y:x.add(y)),(lambda x,y:x|y))
sets.collect()

我不知道出了什么问题。Python 代码返回以下错误消息:

AttributeError: 'NoneType' object has no attribute 'add'

函数add更新集合并返回NoneType(它不返回更新的集合)。然后将此NoneType传递给函数的下一次迭代,因此您会收到错误。你的函数应该返回集合:

def my_add(x, y):
    x.add(y)
    return x
sets = pair.aggregateByKey(set(), my_add, lambda x, y: x|y)
sets.collect()
    [('b', {7}), ('a', {1, 3, 5})]

另一种解决方案

sets = pair.aggregateByKey(set(), lambda x,y:x|{y}, lambda x, y: x|y)

最新更新