我有一个非常大的数据帧,我必须使用数据帧中的特定8列,其中的值要么是"强烈同意"同意";或";不同意";。根据这8列,我需要创建一个新列,告诉行属于哪个集群(1-8((最好使用K-means集群(。
这在PySpak有可能吗?
附言:我是PySpark的新手,如有任何帮助,将不胜感激
column:1 Column:2 column:3 column:4 column:5 .........column:8 new_column_required?
---------- -------- -------- -------- -------- --------- --------------------
agree disagree agree agree disagree disagree cluster1?
disagree NaN disagree disagree NaN agree NaN?
. . . . . . .
. . . . . . .
agree disagree agree agree disagree agree cluster 7?
步骤1:生成测试数据
创建一些(几乎(随机的测试数据。
cols=[f'col{i}' for i in range(1,9)]
rows=100
def create_data():
from random import random
for i in range(0,rows):
yield ['agree' if random() < i/rows else 'disagree' if random() < 0.95 else None for c in cols]
df=spark.createDataFrame(list(create_data()), cols)
步骤2:转换字符串
步骤3中的CCD_ 3无法处理agree
/disagree
字符串。因此字符串被转换为数值。在这里,我们将Null/NaN值视为第三类。
boolean_cols=[f'{c}_bool' for c in cols]
df2 = df.selectExpr(cols + [f'if( {c} = "agree", 1.0, if( {c} = "disagree", 2.0, 3.0)) as {b}' for c, b in zip(cols,boolean_cols)])
使用StringIndexer也是一种选择。但由于只有两个不同的字符串,这可能有点过于设计了。
步骤3:创建特征列
PySpark的K-Means实现期望在单个向量列中实现这些特性。使用VectorAssembler执行此任务。
from pyspark.ml.feature import VectorAssembler
df3 = VectorAssembler(inputCols=boolean_cols, outputCol="features").transform(df2)
步骤4:最后运行聚类算法
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=8).setSeed(1)
kmeans.setMaxIter(10)
model = kmeans.fit(df3)
predictions = model.transform(df3)
从输出中删除中间列后,我们得到
predictions.select(cols + ['prediction']).show()
+--------+--------+--------+--------+--------+--------+--------+--------+----------+
| col1| col2| col3| col4| col5| col6| col7| col8|prediction|
+--------+--------+--------+--------+--------+--------+--------+--------+----------+
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree| 1|
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree| 1|
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree| 1|
[...]
|disagree| agree|disagree| agree| agree|disagree|disagree|disagree| 3|
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree| 1|
|disagree|disagree|disagree|disagree|disagree|disagree| agree|disagree| 5|
|disagree| agree| agree| agree|disagree|disagree|disagree| agree| 3|
| agree| agree| agree|disagree|disagree| agree|disagree|disagree| 6|
[...]
| agree| agree| agree| agree| agree| agree| agree| agree| 7|
| agree| agree| agree| agree| agree|disagree| agree| agree| 2|
| agree| agree| agree| agree| agree| agree| agree| agree| 7|
+--------+--------+--------+--------+--------+--------+--------+--------+----------+
应该是这样的。
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)
// Make predictions
val predictions = model.transform(dataset)
// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")
// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
点击此处了解更多信息。
https://spark.apache.org/docs/latest/ml-clustering.html
这是父链接。
https://spark.apache.org/docs/latest/ml-classification-regression.html