从映射函数内部称为的函数返回Spark RDD中的错误



i i在HBase表中有一个Rowkeys(植物(的集合,我想制作一个FetchData函数,该功能从集合中返回Rowkeys的RDD数据。目的是从fetchdata方法中为植物收集中的每种元素结合RDD。我给出了以下代码的相关部分。我的问题是,该代码为fetchdata的返回类型提供了编译错误:

println(partb:&quot hbaserdd.getnumpartitions(

错误:value getNumpArtitions不是选项的成员[org.apache.spark.rdd.rdd [it.nerdammer.spark.test.sys.sys.record]]

我正在使用Scala 2.11.8 Spark 2.2.0和Maven汇编

import it.nerdammer.spark.hbase._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object sys {
  case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
  val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
  import spark.implicits._
  type Record = (String, Option[String], Option[String])
  def fetchData(plant: String): RDD[Record] = {
    val start_index = plant
    val end_index = plant + "z"
    //The below command works fine if I run it in main function, but to get multiple rows from hbase, I am using it in a separate function
    spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
  }
  def main(args: Array[String]) {
    //the below elements in the collection are prefix of relevant rowkeys in hbase table ("test_table") 
    val plants = Vector("a8","cu","aw","fx")
    val hBaseRDD = plants.map( pp => fetchData(pp))
    println("Part: "+ hBaseRDD.getNumPartitions)
    /*
      rest of the code
    */
  }
}

这是代码的工作版本。这里的问题是我用于循环,我必须从每个循环中的HBase请求对应于Rowkey(Plant(向量的数据,而不是先获取所有数据,然后执行其余代码

    import it.nerdammer.spark.hbase._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
    import org.apache.log4j.Level
    import org.apache.log4j.Logger
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    object sys {
      case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
      def main(args: Array[String]) {
        
        val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
        import spark.implicits._
        type Record = (String, Option[String], Option[String])
        val plants = Vector("a8","cu","aw","fx")
        
        for (plant <- plants){
          val start_index = plant
          val end_index = plant + "z"
          val hBaseRDD = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
          println("Part: "+ hBaseRDD.getNumPartitions)
          /*
            rest of the code
          */
        }
      }
    }

尝试后,这就是我现在被卡住的地方。因此,我该如何施放所需的类型。

scala>   def fetchData(plant: String) = {
     |     val start_index = plant
     |     val end_index = plant + "~"
     |     val x1 = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
     |     x1
     |   }
    

在repl and运行中定义功能

scala> val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)
<console>:39: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, Option[String], Option[String])]
 required: it.nerdammer.spark.hbase.HBaseReaderBuilder[(String, Option[String], Option[String])]
       val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)

预先感谢!

hBaseRDD的类型是Vector[_]而不是RDD[_],因此您无法在其上执行方法getNumPartitions。如果我正确理解,您想联合获取RDD。您可以通过plants.map( pp => fetchData(pp)).reduceOption(_ union _)执行此操作(我建议使用reduceOption,因为它不会在空列表上失败,但是如果您确信该列表不是空的,则可以使用reduce(

也返回的fetchData类型是RDD[U],但我没有找到U的任何定义。这可能是编译器推断Vector[Nothing]而不是Vector[RDD[Record]]的原因。为了避免随后的错误,您还应该将RDD[U]更改为RDD[Record]

最新更新