Hortonworks Spark Hbase Connector(SHC) - 将数据帧写入 Hbase 时出现空指针



这是使用Spark-Hbase连接器将Spark数据帧写入HBase的简单代码。我在df.write操作中遇到NullPointerException并停止将DF写入Hbase。但是,我能够使用Spark-Hbase连接器从Hbase读取。以下链接讨论了此问题,但建议的解决方案没有帮助。

https://github.com/hortonworks-spark/shc/issues/278

https://github.com/hortonworks-spark/shc/issues/46

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
object SparkHbaseHW {
case class Employee(key: String, fName: String, lName: String,
mName: String, addressLine: String, city: String,
state: String, zipCode: String)
def main(args: Array[String]): Unit = {
def catalog =
s"""{
|"table":{"namespace":"default", "name":"employee"},
|"rowkey":"key",
|"columns":{
|"key":{"cf":"rowkey", "col":"key", "type":"string"},
|"fName":{"cf":"person", "col":"firstName", "type":"string"},
|"lName":{"cf":"person", "col":"lastName", "type":"string"},
|"mName":{"cf":"person", "col":"middleName", "type":"string"},
|"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
|"city":{"cf":"address", "col":"city", "type":"string"},
|"state":{"cf":"address", "col":"state", "type":"string"},
|"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
|}
|}""".stripMargin

val data = Seq(Employee("1", "Abby", "Smith", "K", "3456 main", "Orlando", "FL", "45235"),
Employee("2", "Amaya", "Williams", "L", "123 Orange", "Newark", "NJ", "27656"),
Employee("3", "Alchemy", "Davis", "P", "Warners", "Sanjose", "CA", "34789"))

val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkHbaseWrite")
.getOrCreate()

import spark.implicits._
val df = spark.sparkContext.parallelize(data).toDF

df.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "4"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}
}

例外:

Exception in thread "main" java.lang.NullPointerException
at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:124)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:202)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:179)
at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:391)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.insert(HBaseRelation.scala:230)
at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:61)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at com.test.HBaseSparkHW$.main(HbaseSparkHW.scala:55)
at com.test.HBaseSparkHW.main(HbaseSparkHW.scala)
20/05/18 16:59:50 INFO SparkContext: Invoking stop() from shutdown hook```

正如这里所讨论的,我对SparkSession构建器进行了额外的配置更改,异常消失了。但是,我不清楚原因和解决方法。希望有人能解释一下。

val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("HbaseSparkWrite")
.config("spark.hadoop.validateOutputSpecs", false)
.getOrCreate()

最新更新