将Rdd[Vector]转换为Rdd[Double]



如何将csv转换为Rdd[Double]?我有错误:不能应用到(org.apache.spark.rdd.RDD[Unit])在这一行:

val kd = new KernelDensity().setSample(rows) 

我的完整代码在这里:

   import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    import org.apache.spark.mllib.stat.KernelDensity
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkContext, SparkConf}
class KdeAnalysis {
  val conf = new SparkConf().setAppName("sample").setMaster("local")
  val sc = new SparkContext(conf)
  val DATAFILE: String = "C:\Users\ajohn\Desktop\spark_R\data\mass_cytometry\mass.csv"
  val rows = sc.textFile(DATAFILE).map {
    line => val values = line.split(',').map(_.toDouble)
      Vectors.dense(values)
  }.cache()

  // Construct the density estimator with the sample data and a standard deviation for the Gaussian
  // kernels
  val rdd : RDD[Double] = sc.parallelize(rows)
  val kd = new KernelDensity().setSample(rdd)
    .setBandwidth(3.0)
  // Find density estimates for the given values
  val densities = kd.estimate(Array(-1.0, 2.0, 5.0))
}

由于rowsRDD[org.apache.spark.mllib.linalg.Vector],以下行不能工作:

val rdd : RDD[Double] = sc.parallelize(rows)

parallelize期望Seq[T], RDD不是Seq

即使这部分工作如你所愿,你的输入也是错误的。KernelDensity.setSample的正确参数是RDD[Double]JavaRDD[java.lang.Double]。目前看来不支持多变量数据。

关于一个问题从瓷砖你可以flatMap

rows.flatMap(_.toArray)

或者创建rows

val rows = sc.textFile(DATAFILE).flatMap(_.split(',').map(_.toDouble)).cache()

但我怀疑这是不是你真正需要的。

已准备好此代码,请评估它是否可以帮助您->

val doubleRDD = rows.map(_.toArray).flatMap(x => x)

最新更新