apache spark - Scala:将结果保存在toDf临时表中



我试图在toDF TempTable中保存一些分析,但收到以下错误":215:error: value toDF不是Double的成员"。我正在读取卡桑德拉表的数据,我正在做一些计算。我想保存这些结果在一个临时表。我是scala的新手,有人能帮帮我吗?我的代码

case class Consumo(consumo:Double, consumo_mensal: Double, mes:   org.joda.time.DateTime,ano: org.joda.time.DateTime, soma_pf: Double,empo_gasto: Double);
object Analysegridata{
val conf = new SparkConf(true)
 .set("spark.cassandra.connection.host","127.0.0.1").setAppName("LiniarRegression")
.set("spark.cassandra.connection.port", "9042")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.streaming.receiver.writeAheadLog.enable", "true");
val sc = new SparkContext(conf);
val ssc = new StreamingContext(sc, Seconds(1))
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val checkpointDirectory = "/var/lib/cassandra/data"
ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
// val context = StreamingContext.getOrCreate(checkpointDirectory)    
 import sqlContext.implicits._
 JavaSparkContext.fromSparkContext(sc);
 def rddconsumo(rddData: Double): Double = {
 val  rddData: Double = {  
  implicit val data = conf
  val grid = sc.cassandraTable("smartgrids", "analyzer").as((r:Double) => (r)).collect
 def goto(cs: Array[Double]): Double = {
   var consumo = 0.0;
   var totaldias = 0;
   var soma_pf = 0.0;
   var somamc = 0.0;
   var tempo_gasto = 0.0;
   var consumo_mensal = 0.0;
   var i=0
for (i <- 0 until grid.length) {      
   val minutos = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MINUTE");
   val horas = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol","HOUR_OF_DAY");
   val dia = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "DAY_OF_MONTH");
   val ano = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "YEAR");
   val mes = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MONTH");
   val potencia = sc.cassandraTable("smartgrids","analyzer_temp").select("n_pf1", "n_pf2", "n_pf3")
 def convert_minutos (minuto : Int) : Double ={
  minuto/60
 }
   dia.foreach (i =>  {
   def adSum(potencia: Array[Double]) = { 
    var i=0;
      while (i < potencia.length) {
       soma_pf  += potencia(i);
       i += 1;
       soma_pf;
       println("Potemcia =" + soma_pf)  
    }  
 }
   def tempo(minutos: Array[Int]) = { 
       var i=0;
       while (i < minutos.length) {
         somamc  += convert_minutos(minutos(i))  
         i += 1;
        somamc 
     } 
  }
  def tempogasto(horas: Array[Int]) = { 
      var i=0;
      while (i < horas.length) {
       tempo_gasto = horas(i) + somamc;
       i += 1;
       tempo_gasto;
       println("Temo que o aparelho esteve ligado =" + tempo_gasto)
     } 
 }
 def consumof(dia: Array[Int]) = { 
     var i=0;
     while (i < dia.length) {
       consumo = soma_pf * tempo_gasto;
       i += 1;
       consumo; 
       println("Consumo diario =" + consumo)       
     } 
  }
})
mes.foreach (i => {
 def totaltempo(dia: Array[Int]) = { 
     var i = 0;
     while(i < dia.length){
       totaldias += dia(i);
       i += 1;
       totaldias;
       println("Numero total de dias =" + totaldias)  
     }
 }
 def consumomensal(mes: Array[Int]) = { 
     var i = 0;
     while(i < mes.length){
      consumo_mensal = consumo * totaldias;
      i += 1;
     consumo_mensal;
     println("Consumo Mensal =" + consumo_mensal);
   }
} 
})
}
    consumo; 
    totaldias;
    consumo_mensal; 
    soma_pf;
    tempo_gasto;
    somamc
}
 rddData
  }
   rddData.toDF().registerTempTable("rddData")
 }
      ssc.start()
      ssc.awaitTermination()


 error: value toDF is not a member of Double"

您究竟想要做什么并不清楚(代码太多,请尝试提供一个最小的示例),但是存在一些明显的问题:

  1. rddData具有类型Double :似乎应该是RDD[Double](这是双值的分布式集合)。试图将单个Double值保存为表没有多大意义,实际上-不起作用(toDF可以在RDD上调用,而不是任何类型,特别是在Double上,正如编译器警告的那样)。
  2. collect()数据:如果你想加载一个RDD,使用一些操作转换它,然后将其保存为一个表- collect()可能不应该在RDD上调用。collect()将所有数据(分布在整个集群中)发送到单个"驱动程序"机器(运行此代码的机器)—之后,您没有利用集群,也没有使用RDD数据结构,因此您无法使用toDF将数据转换为DataFrame。

相关内容

  • 没有找到相关文章

最新更新