Spark K 意味着性能随着节点/实例的增加而降低



我正在使用Spark MLLib在AWS EMR上执行K-means集群。 数据集大约为 10^6 行,包含 9 个特征列。 我使用的实例大小有 8vCPU 和 32GB 内存。

我预计随着集群上节点数量的增加,我会从 Spark 提高性能(减少执行时间(,但我得到了相反的结果。

与单个工作节点相比,使用更多的工作器节点/实例获得更差的性能(更高的执行时间(。 我对 5、10 和 15 个工作器节点的集群有相同的结果;随着节点数量的增加,性能会下降。 我尝试改变分区(spark.sql.shuffle.partitions(,并使用执行器核心,执行器数量和执行器内存的各种配置。

我的代码如下(执行器的数量适用于 10 个工作节点(:

spark-shell --executor-cores 3 num-executors 20 --executor-memory 10G
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._ 
import org.apache.spark.sql.functions._
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.{SparkSession, SQLContext, DataFrame}
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector}
sc.stop()
val configuration = new Configuration()
val sc = new SparkContext("local", "phoenix-load")
val sqlContext = new SQLContext(sc) 
//dataset is loaded in from Phoenix table and set as featureDf6
//dataset is made up of all numerical values (DOUBLE)
val columns = Array("DUR","AVG_AMP","AVG_POW","PAPR","SNR","SNR_DB","BW_3DB","BW_10DB","BWNE")    
val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("featuresin")
val df = assembler.transform(featureDf6)
val scaler = new MinMaxScaler().setInputCol("featuresin").setOutputCol("features").setMin(-1).setMax(1)
val scalerModel = scaler.fit(df)
val scaledData = scalerModel.transform(df)
val kmeans = new KMeans().setK(14).setSeed(1L).setMaxIter(1000)
val model = kmeans.fit(scaledData)

我发现问题的原因是Spark从Phoenix/HBase读取数据的方法。 当我将数据集直接上传到 Spark 时,结果符合预期,随着节点的增加,执行时间会减少。 我将发布另一个问题,以确定从凤凰城阅读过程中的错误。

最新更新