在groupByKey()之后总结DenseVectors在Pyspark Shell中工作,但在Spark-Submi



下面是我要做的一些示例代码:

首先,我使用Word2Vec构建句子特征向量:

from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = sqlContext.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
Converting output result to an RDD:
result_rdd=result.select("result").rdd
rdd_with_sample_ids_attached = result_rdd.map(lambda x: (1, x[0]))
rdd_with_sample_ids_attached.collect()

输出:[(1、DenseVector ((0.0472, -0.0078, 0.0377))), (1, DenseVector ((-0.0253, -0.0171, 0.0664))), (1, DenseVector ((0.0101, 0.0324, 0.0158))))

现在,我做一个groupByKey(),并找到DenseVectors在每个组的总和如下:

rdd_sum = rdd_with_sample_ids_attached.groupByKey().map(lambda x: (x[0], sum(x[1])))
rdd_sum.collect()

输出:[(1, DenseVector([0.0319, 0.0075, 0.1198])]

如所示,这段代码在pyspark shell中可以完美地工作。但是,当我提交与spark提交相同的代码时,我得到以下错误:
File "/mnt1/yarn/usercache/hadoop/appcache/application_1465567204576_0170/container_1465567204576_0170_01_000002/pyspark.zip/pyspark/sql/functions.py", line 39, in _
   jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
AttributeError: 'NoneType' object has no attribute '_jvm'

我已经尝试将RDD重新分区到单个分区,同样的错误。请帮忙好吗?

明白了!问题是我在脚本中有一个导入函数,如下所示:

from pyspark.sql.functions import *

这导入了sum()函数,取代了内置的python sum()。当我删除这个导入函数时,它可以正常工作。当python内置sum()函数能够添加DenseVectors时,从pyspark.sql.functions导入的sum()不能这样做。

最新更新