我正在使用以下代码创建集群模型:
import pandas as pd
pandas_df = pd.read_pickle('df_features.pickle')
spark_df = sqlContext.createDataFrame(pandas_df)
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=2, seed=1.0)
modela = kmeans.fit(spark_df)
然后我得到了错误:
AnalysisException Traceback (most recent call last)
<ipython-input-26-00e1e2ba1983> in <module>()
3
4 kmeans = KMeans(k=2, seed=1.0)
----> 5 modela = kmeans.fit(spark_df)
/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/base.pyc in fit(self, dataset, params)
62 return self.copy(params)._fit(dataset)
63 else:
---> 64 return self._fit(dataset)
65 else:
66 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/wrapper.pyc in _fit(self, dataset)
211
212 def _fit(self, dataset):
--> 213 java_model = self._fit_java(dataset)
214 return self._create_model(java_model)
215
/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/wrapper.pyc in _fit_java(self, dataset)
208 """
209 self._transfer_params_to_java()
--> 210 return self._java_obj.fit(dataset._jdf)
211
212 def _fit(self, dataset):
/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u"cannot resolve '`features`' given input columns: [field_1, field_2, field_3, field_4, field_5, field_6, field_7];"
我是否创建了错误的数据帧?有人知道我错过了什么吗?谢谢
您需要使用VectorAssemblerhttp://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=spark_df.columns, outputCol="features")
vector_df = vecAssembler.transform(spark_df)
kmeans = KMeans().setK(n_clusters).setSeed(1)
model = kmeans.fit(vector_df )
对于kmean,它需要DenseVectors的rdd。因此,您需要创建DenseVectors的rdd,其中每个向量对应于数据帧的一行。因此,假设您的数据帧有三列要输入到K Means模型中,我会将其重构为以下行:
spark_rdd = spark_df.rdd.sortByKey()
modelInput = spark_rdd.map(lambda x: Vectors.dense(x[0],x[1],x[2])).sortByKey()
modelObject = Kmeans.train(modelInput,2)
然后,如果您想将结果从RDD返回到数据帧中,我会做如下操作:
labels = modelInput.map(lambda x: model.predict(x))
results = labels.zip(spark_rdd)
resultFrame = results.map(lambda x: Row(Label = x[0], Column1 = x[0][1], Column2 = x[1][1],Column3 = x[1][2]).toDF()
data = [(Vectors.dense( [x[0], x[1]]),) for x in pandas_df.iloc[0:,2:4].values]
spark_df = spark.createDataFrame(data, ["features"])
kmeans = KMeans(k=2, seed=1.0)
modela = kmeans.fit(spark_df)
有关更多详细信息,请参阅官方手册