将一个HBase表中的多个列名放到一个SparkRDD中



我必须将HBase中一个表中的多个列族放入一个sparkRDD中。我正在尝试使用以下代码:(在第一个答案之后编辑的问题)

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable    
import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark._
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client._
object HBaseRead {
   def main(args: Array[String]) {
     val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     val sc = new SparkContext(sparkConf)        
     val conf = HBaseConfiguration.create()  
     val tableName = "TableName"  
     ////setting up required stuff 
     System.setProperty("user.name", "hdfs")        
     System.setProperty("HADOOP_USER_NAME", "hdfs")
     conf.set("hbase.master", "localhost:60000")
     conf.setInt("timeout", 120000)
     conf.set("hbase.zookeeper.quorum", "localhost")
     conf.set("zookeeper.znode.parent", "/hbase-unsecure")
     conf.set(TableInputFormat.INPUT_TABLE, tableName)
     sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result])) 
     val admin = new HBaseAdmin(conf)
     if (!admin.isTableAvailable(tableName)) {
          val tableDesc = new HTableDescriptor(tableName)
          admin.createTable(tableDesc)
     }
     case class Model(Shoes: String,Clothes: String,T-shirts: String)
     var hBaseRDD2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
     val transformedRDD = hBaseRDD2.map(tuple => {
         val result = tuple._2
         Model(Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("Shoes"))),
         Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("Clothes"))),
         Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("T-shirts")))
         )
     })
     val totalcount = transformedRDD.count()
     println(totalcount)
   }
}

我想做的是创建一个rdd,其中来自这些列族的第一行(以及随后的行)的值将组合在rdd中的单个数组中。任何帮助都会很感激。由于

您可以通过几种方式进行操作,在rdd map中,您可以从父rdd[hBaseRDD2]中获取所有列并将其转换并将其作为另一个rdd返回。

或者您可以创建一个case类并将其映射到这些列。

例如

:

case class Model(column1: String,
                      column1: String,
                      column1: String)
var hBaseRDD2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val transformedRDD = hBaseRDD2.map(tuple => {
    val result = tuple._2
    Model(Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("Columnname1"))),
    Bytes.toString(result.getValue(Bytes.toBytes("cf2"),Bytes.toBytes("Columnname2"))),
    Bytes.toString(result.getValue(Bytes.toBytes("cf2"),Bytes.toBytes("Columnname2")))
    )
})

最新更新