我在运行下面我编写的SPARK代码时出错。我试图找到所有基于关键字的向量的和。每个输入行以键(整数)开始,然后是127个浮点数字,这是一个具有127个维度的单个矢量,即每行以键和矢量开始。
from cStringIO import StringIO
class testing:
def __str__(self):
file_str = StringIO()
for n in self.vector:
file_str.write(str(n))
file_str.write(" ")
return file_str.getvalue()
def __init__(self,txt="",initial=False):
self.vector = [0.0]*128
if len(txt)==0:
return
i=0
for n in txt.split():
if i<128:
self.vector[i]=float(n)
i = i+1
continue
self.filename=n
break
def addVec(self,r):
a = testing()
for n in xrange(0,128):
a.vector[n] = self.vector[n] + r.vector[n]
return a
def InitializeAndReturnPair(string,first=False):
vec = testing(string,first)
return 1,vec
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache()
output.saveAsTextFile("output")
print output.reduceByKey(lambda a,b : a).collect()
input.txt
中的示例行
6.0 156.0 26.0 3.0 1.0 0.0 2.0 1.0 15.0 113.0 53.0 139.0 156.0 0.0 0.0 0.0 156.0 29.0 1.0 38.0 59.0 0.0 0.0 28.0 4.0 9.0 1.0 0.0 0.0 9.0 83.0 13.0 1.0 0.0 9.0 42.0 7.0 41.0 71.0 74.0 123.0 35.0 17.0 7.0 156.0 27.0 6.0 33.0 11.0 2.0 0.0 35.0 4.0 4.0 1.0 3.0 4.0 0.0 0.0 2.0 19.0 45.0 17.0 47.0 2.0 7.0 59.0 15.0 11.0 156.0 14.0 1.0 4.0 11.0 2.029.0 35.0 6.0 5.0 9.0 4.0 2.0 1.0 1.0 0.0 0.0 1.0 5.0 25.0 14.0 27.0 2.0 0.0 2.0 86.0 48.0 10.0 6.0 156.0 23.0 1.0 2.0 21.0 6.0 0.0 3.0 31.0 10.0 4.0 3.0 0.0 1.0 2.0
下面是我得到的错误。此错误来自代码的最后一行,即output.reduceByKey
错误消息-http://pastebin.com/0tqiiJQm
不太确定如何解决这个问题。我尝试使用MarshalSerializer
,但它给出了相同的问题。
------------------------------答案------------------------------
我从apache用户列表中得到了相同问题的答案。基本上,在集群中运行的映射器/还原器没有类定义,我们必须通过在不同的模块中写入类并在使用配置SparkContext时附加来传递类
sc.addPyFile(os.path( HOMEDirectory + "module.py"))
谢谢大家帮我。
您可以使用numpy数组,这些数组可以很好地与spark配合使用。
import numpy as np
def row_map(text):
split_text = text.split()
# create numpy array from elements besides the first element
# which is the key
return split_text(0), np.array([float(v) for v in split_text[1:]])
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
inp = sc.textFile("input.txt")
output = inp.map(row_map).cache()
#Below line is throwing error
print output.reduceByKey(lambda a,b : np.add(a,b)).collect()
更加简洁和夸张。