K表示使用PySpark进行聚类



我有一个非常大的数据帧,我必须使用数据帧中的特定8列,其中的值要么是"强烈同意"同意";或";不同意";。根据这8列,我需要创建一个新列,告诉行属于哪个集群(1-8((最好使用K-means集群(。

这在PySpak有可能吗?

附言:我是PySpark的新手,如有任何帮助,将不胜感激

column:1     Column:2    column:3   column:4    column:5 .........column:8    new_column_required?
----------   --------    --------   --------    --------          ---------   --------------------
agree        disagree    agree      agree       disagree          disagree    cluster1?
disagree     NaN         disagree    disagree   NaN                agree       NaN?       
.            .           .           .          .                 .           .
.            .           .           .          .                 .           .
agree        disagree    agree       agree     disagree           agree       cluster 7?

步骤1:生成测试数据

创建一些(几乎(随机的测试数据。

cols=[f'col{i}' for i in range(1,9)]
rows=100
def create_data():
from random import random
for i in range(0,rows):
yield ['agree' if random() < i/rows else 'disagree' if random() < 0.95 else None for c in cols]
df=spark.createDataFrame(list(create_data()), cols)

步骤2:转换字符串

步骤3中的CCD_ 3无法处理agree/disagree字符串。因此字符串被转换为数值。在这里,我们将Null/NaN值视为第三类。

boolean_cols=[f'{c}_bool' for c in cols]
df2 = df.selectExpr(cols + [f'if( {c} = "agree", 1.0, if( {c} = "disagree", 2.0, 3.0)) as {b}' for c, b in zip(cols,boolean_cols)])

使用StringIndexer也是一种选择。但由于只有两个不同的字符串,这可能有点过于设计了。

步骤3:创建特征列

PySpark的K-Means实现期望在单个向量列中实现这些特性。使用VectorAssembler执行此任务。

from pyspark.ml.feature import VectorAssembler
df3 = VectorAssembler(inputCols=boolean_cols, outputCol="features").transform(df2)

步骤4:最后运行聚类算法

from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=8).setSeed(1)
kmeans.setMaxIter(10)
model = kmeans.fit(df3)
predictions = model.transform(df3)

从输出中删除中间列后,我们得到

predictions.select(cols + ['prediction']).show()
+--------+--------+--------+--------+--------+--------+--------+--------+----------+
|    col1|    col2|    col3|    col4|    col5|    col6|    col7|    col8|prediction|
+--------+--------+--------+--------+--------+--------+--------+--------+----------+
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree|         1|
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree|         1|
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree|         1|
[...]
|disagree|   agree|disagree|   agree|   agree|disagree|disagree|disagree|         3|
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree|         1|
|disagree|disagree|disagree|disagree|disagree|disagree|   agree|disagree|         5|
|disagree|   agree|   agree|   agree|disagree|disagree|disagree|   agree|         3|
|   agree|   agree|   agree|disagree|disagree|   agree|disagree|disagree|         6|
[...]
|   agree|   agree|   agree|   agree|   agree|   agree|   agree|   agree|         7|
|   agree|   agree|   agree|   agree|   agree|disagree|   agree|   agree|         2|
|   agree|   agree|   agree|   agree|   agree|   agree|   agree|   agree|         7|
+--------+--------+--------+--------+--------+--------+--------+--------+----------+

应该是这样的。

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)
// Make predictions
val predictions = model.transform(dataset)
// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")
// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)

点击此处了解更多信息。

https://spark.apache.org/docs/latest/ml-clustering.html

这是父链接。

https://spark.apache.org/docs/latest/ml-classification-regression.html

相关内容

  • 没有找到相关文章

最新更新