我正在学习如何用Python中的Spark编程,并努力解决一个问题。
问题是我加载了一个 PythonRDD 作为 id 和描述:
pythonRDD.take(1)
## [('b000jz4hqo', ['clickart', '950', '000', 'premier', 'image', 'pack', 'dvd', 'rom', 'broderbund'])]
和 ParallelCollectionRDD 加载为 id 和描述:
paraRDD.take(1)
## [('b000jz4hqo', ['clickart', '950', '000', 'premier', 'image', 'pack', 'dvd', 'rom', 'broderbund'])]
我可以像这样对paraRDD进行计数:
paraRDD.map(lambda l: (l[0],len(l[1]))).reduce(lambda a,b: a[1] + b[1])
或者干脆
paraRDD.reduce(lambda a,b: len(a[1]) + len(b[1]))
但是在pythonRDD上它遇到了错误,该错误说:
"TypeError: 'int' 对象没有属性 'getitem'"。
def countTokens(vendorRDD):
return vendorRDD.map(lambda l: (l[0],len(l[1]))).reduce(lambda a,b: a[1] + b[1])
关于这是如何发生的任何想法将不胜感激?!
PythonRDD
和 ParallelCollectionRDD
之间的区别在这里完全无关紧要。你的代码是错误的。
reduce
方法采用具有以下签名的关联和交换函数:
(T, T) => T
换句话说,参数和返回的对象必须是相同的类型,并且操作顺序和括号不会影响最终结果。传递给reduce
的函数根本不满足这些条件。
要使其工作,您需要这样的东西:
rdd.map(lambda l: len(l[1])).reduce(lambda x, y: x + y)
甚至更好:
from operator import add
rdd.values().map(len).reduce(add)