我有一个数据帧,我想把它插入到hbase中。我遵循这个文档.
这是我的数据帧的样子:
--------------------
|id | name | address |
|--------------------|
|23 |marry |france |
|--------------------|
|87 |zied |italie |
--------------------
我使用以下代码创建一个 hbase 表:
val tableName = "two"
val conf = HBaseConfiguration.create()
if(!admin.isTableAvailable(tableName)) {
print("-----------------------------------------------------------------------------------------------------------")
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
admin.createTable(tableDesc)
}else{
print("Table already exists!!--------------------------------------------------------------------------------------")
}
现在如何将此数据帧插入 hbase ?
在另一个示例中,我使用以下代码成功插入到hbase中:
val myTable = new HTable(conf, tableName)
for (i <- 0 to 1000) {
var p = new Put(Bytes.toBytes(""+i))
p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5)))
p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2017-04-20"))
p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i))
p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i))
myTable.put(p)
}
myTable.flushCommits()
但是现在我卡住了,如何将我的数据帧的每条记录插入我的 hbase 表中。
感谢您的时间和关注
另一种方法是查看rdd.saveAsNewAPIHadoopDataset,将数据插入到hbase表中。
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("sparkToHive").enableHiveSupport().getOrCreate()
import spark.implicits._
val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", "ip's")
config.set("hbase.zookeeper.property.clientPort","2181")
config.set(TableInputFormat.INPUT_TABLE, "tableName")
val newAPIJobConfiguration1 = Job.getInstance(config)
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "tableName")
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val df: DataFrame = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")
val hbasePuts= df.rdd.map((row: Row) => {
val put = new Put(Bytes.toBytes(row.getString(0)))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"), Bytes.toBytes(row.getString(1)))
put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"), Bytes.toBytes(row.getString(2)))
(new ImmutableBytesWritable(), put)
})
hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
}
编号 : https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/
下面是使用Maven中可用的Hortonworks的spark hbase连接器的完整示例。
此示例显示
- 如何检查 HBase 表是否存在
- 创建 HBase 表(如果不存在(
- 将数据帧插入 HBase 表
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, TableDescriptorBuilder}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
object Main extends App {
case class Employee(key: String, fName: String, lName: String, mName: String,
addressLine: String, city: String, state: String, zipCode: String)
// as pre-requisites the table 'employee' with column families 'person' and 'address' should exist
val tableNameString = "default:employee"
val colFamilyPString = "person"
val colFamilyAString = "address"
val tableName = TableName.valueOf(tableNameString)
val colFamilyP = colFamilyPString.getBytes
val colFamilyA = colFamilyAString.getBytes
val hBaseConf = HBaseConfiguration.create()
val connection = ConnectionFactory.createConnection(hBaseConf);
val admin = connection.getAdmin();
println("Check if table 'employee' exists:")
val tableExistsCheck: Boolean = admin.tableExists(tableName)
println(s"Table " + tableName.toString + " exists? " + tableExistsCheck)
if(tableExistsCheck == false) {
println("Create Table employee with column families 'person' and 'address'")
val colFamilyBuild1 = ColumnFamilyDescriptorBuilder.newBuilder(colFamilyP).build()
val colFamilyBuild2 = ColumnFamilyDescriptorBuilder.newBuilder(colFamilyA).build()
val tableDescriptorBuild = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(colFamilyBuild1)
.setColumnFamily(colFamilyBuild2)
.build()
admin.createTable(tableDescriptorBuild)
}
// define schema for the dataframe that should be loaded into HBase
def catalog =
s"""{
|"table":{"namespace":"default","name":"employee"},
|"rowkey":"key",
|"columns":{
|"key":{"cf":"rowkey","col":"key","type":"string"},
|"fName":{"cf":"person","col":"firstName","type":"string"},
|"lName":{"cf":"person","col":"lastName","type":"string"},
|"mName":{"cf":"person","col":"middleName","type":"string"},
|"addressLine":{"cf":"address","col":"addressLine","type":"string"},
|"city":{"cf":"address","col":"city","type":"string"},
|"state":{"cf":"address","col":"state","type":"string"},
|"zipCode":{"cf":"address","col":"zipCode","type":"string"}
|}
|}""".stripMargin
// define some test data
val data = Seq(
Employee("1","Horst","Hans","A","12main","NYC","NY","123"),
Employee("2","Joe","Bill","B","1337ave","LA","CA","456"),
Employee("3","Mohammed","Mohammed","C","1Apple","SanFran","CA","678")
)
// create SparkSession
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("HBaseConnector")
.getOrCreate()
// serialize data
import spark.implicits._
val df = spark.sparkContext.parallelize(data).toDF
// write dataframe into HBase
df.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "3")) // create 3 regions
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}
当我的资源中有相关的site-xmls("core-site.xml","hbase-site.xml","hdfs-site.xml"(时,这对我有用。
使用答案进行代码格式化医生说:
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark ")
.save()
其中sc.parallelize(data(.toDF是你的DataFrame。文档示例使用 sc.parallelize(data(.toDF 将 scala 集合转换为数据帧
您已经拥有数据帧,只需尝试调用
yourDataFrame.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark ")
.save()
它应该有效。文档很清楚...
UPD
给定具有指定架构的数据帧,上面将创建一个 HBase 包含 5 个区域的表,并将数据帧保存在其中。请注意,如果 未指定 HBaseTableCatalog.newTable,该表必须 预先创建。
这是关于数据分区的。每个 HBase 表可以有 1...X 个区域。应仔细选择区域数。低区域数量是不好的。高地区数量也很糟糕。