如何在不扫描整个表的情况下使用spark连接HBase表,并获得特定的已知行键数据



我有大量的行键,需要得到这些行键的数据没有扫描整个表或加载整个表到spark表是非常大

有许多HBase-Spark库,但其中许多都是旧的,并且没有维护。所以我更喜欢老的可靠的方法。

试试下面的

我有一个'word_counts'HBase表,'counts'列族。在我的例子中,'counts:count'列是Long

我将使用newAPIHadoopRDD方法,通过hbase-mapreduce库访问HBase

让我们开始吧(我使用的是Apache Spark 3.1.3,但它应该适用于任何Spark版本)

spark-shell --master yarn --packages org.apache.hbase:hbase-client:2.2.7,org.apache.hbase:hbase-common:2.2.7,org.apache.hbase:hbase-mapreduce:2.2.7
我的代码如下:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.client.Scan

// This function converts a Scan object to a string representation 
// that can be passed to HBase as a configuration value
def convertScanToString(scan: Scan) = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBase64String(proto.toByteArray)
}
// Create a new Scan object and set a PrefixFilter 
// to only return rows with keys that start with "s"
val scan = new Scan()
val filter = new PrefixFilter(Bytes.toBytes("s"))
scan.setFilter(filter)
// Create a new HBase configuration and set the input table, 
// zookeeper quorum, and scan
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "word_counts")
conf.set("hbase.zookeeper.quorum", "zookeeper-host")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
// Create an RDD using the HBase configuration and the TableInputFormat
val rdd = spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
// Convert the RDD to a DataFrame and set the column names
val df = rdd.map(x => {
val rowKey = Bytes.toString(x._2.getRow)
val columnValue = Option(x._2.getValue(Bytes.toBytes("counts"), 
Bytes.toBytes("count"))).map(value => 
Bytes.toLong(value).toString).getOrElse("")
(rowKey, columnValue)
}).toDF("row_key", "column_value")

如果您只对特定的键感兴趣,您可以使用下面的

import org.apache.hadoop.hbase.client.{Get, Result}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "zookeeper-host")
val tableName = TableName.valueOf("word_counts")
val connection = ConnectionFactory.createConnection(hbaseConf)
val table = connection.getTable(tableName)
val get = new Get(Bytes.toBytes("s"))
val result: Result = table.get(get)
val value = Bytes.toString(result.getValue(Bytes.toBytes("counts"), Bytes.toBytes("count")))
println(s"Value for key 's' is: $value")
table.close()
connection.close()

您可以尝试使用gimel库,根据md文件过滤器将其下推到Hbase。

What is gimel?
Gimel is a Big Data Abstraction framework built on Apache Spark & other open source connectors in the industry.
Gimel provides unified Data API to read & write data to various stores.
Alongside, a unified SQL access pattern for all stores alike.
The APIs are available in both scala & python (pyspark).

HBase md文件

在这个md文件中,你会发现一些例子,也许他们类似于你的用例

gsql("set gimel.hbase.columns.mapping=personal:name,personal:age")
val df = gsql("select * from udc.Hbase.ClusterName.default.test_emp where rowkey='1-MAC'")
df.show
df.explain
== Physical Plan ==
*(1) Filter isnotnull(rowkey#181)
+- *(1) Scan HBaseRelation(Map(catalog -> {"table":{"namespace":"default", "name":"test_emp", "tableCoder":"PrimitiveType"},
"rowkey":"rowkey",
"columns":{
"rowkey":{"cf":"rowkey", "col":"rowkey", "type":"string", "length":"50"},
"name":{"cf":"personal", "col":"name", "type":"string"},
"age":{"cf":"personal", "col":"age", "type":"string"}
}
}
),None) [rowkey#181,name#182,age#183] PushedFilters: [IsNotNull(rowkey), *EqualTo(rowkey,1-MAC)], ReadSchema: struct<rowkey:string,name:string,age:string>

最新更新