如何将csv转换为Rdd[Double]?我有错误:不能应用到(org.apache.spark.rdd.RDD[Unit])在这一行:
val kd = new KernelDensity().setSample(rows)
我的完整代码在这里:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.stat.KernelDensity
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
class KdeAnalysis {
val conf = new SparkConf().setAppName("sample").setMaster("local")
val sc = new SparkContext(conf)
val DATAFILE: String = "C:\Users\ajohn\Desktop\spark_R\data\mass_cytometry\mass.csv"
val rows = sc.textFile(DATAFILE).map {
line => val values = line.split(',').map(_.toDouble)
Vectors.dense(values)
}.cache()
// Construct the density estimator with the sample data and a standard deviation for the Gaussian
// kernels
val rdd : RDD[Double] = sc.parallelize(rows)
val kd = new KernelDensity().setSample(rdd)
.setBandwidth(3.0)
// Find density estimates for the given values
val densities = kd.estimate(Array(-1.0, 2.0, 5.0))
}
由于rows
是RDD[org.apache.spark.mllib.linalg.Vector]
,以下行不能工作:
val rdd : RDD[Double] = sc.parallelize(rows)
parallelize
期望Seq[T]
, RDD
不是Seq
。
即使这部分工作如你所愿,你的输入也是错误的。KernelDensity.setSample
的正确参数是RDD[Double]
或JavaRDD[java.lang.Double]
。目前看来不支持多变量数据。
关于一个问题从瓷砖你可以flatMap
rows.flatMap(_.toArray)
或者创建rows
val rows = sc.textFile(DATAFILE).flatMap(_.split(',').map(_.toDouble)).cache()
但我怀疑这是不是你真正需要的。
已准备好此代码,请评估它是否可以帮助您->
val doubleRDD = rows.map(_.toArray).flatMap(x => x)