我正在尝试使用pySpark写入hbase表。到目前为止,我可以从hbase读取数据。但是在写入 HBase 表时出现异常。
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
properties = {
"instanceId" : "hbase",
"zookeepers" : "10-x-x-x.local:2181,10-x-x-x.local:2181,10-x-x-x.local:2181",
"hbase.columns.mapping" : "KEY_FIELD STRING :key, A STRING c:a, B STRING c:b",
"hbase.use.hbase.context" : False,
"hbase.config.resources" : "file:///etc/hbase/conf/hbase-site.xml",
"hbase.table" : "t"
}
spark = SparkSession
.builder
.appName("hbaseWrite")
.getOrCreate()
sc = spark.sparkContext
#I am able to read the data successfully.
#df = spark.read.format("org.apache.hadoop.hbase.spark")
# .options( **properties)
# .load()
data = [("3","DATA 3 A", "DATA 3 B")]
columns = ['KEY_FIELD','A','B']
cSchema = StructType([StructField(columnName, StringType()) for columnName in columns])
df = spark.createDataFrame(data, schema=cSchema)
df.write
.options( **properties)
.mode('overwrite').format("org.apache.hadoop.hbase.spark").save()
按以下格式执行命令:
spark2-submit --master local[*] write_to_hbase.py
Spark版本:2.2.0.cloudera1(我无法更改我的Spark版本)HBase 版本:1.2.0-cdh5.12.0(但我可以更改我的 HBase 版本)
注意:我已将 hbase jar 添加到 spark2 jar 文件夹中,并且已将以下依赖 jar 添加到 spark2 jar 文件夹中。
- 火花-core_2.11-1.6.1.jar
- htrace-core-3.1.0-孵化中.jar
- scala-library-2.9.1.jar
错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o70.save.
: java.lang.RuntimeException: org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:476)
我尝试了多个建议,但没有任何效果。这可能是一个重复的问题,但我没有其他选择来找到答案。
使用的是Cloudera distribution
那么Hard Luck没有官方方法可以使用PYSAPRK
写入HBASE
。这已经得到了Cloudera support Team
的证实。
但是,如果您正在使用Hortonworks
并且您有spark 2.0
那么下面的链接应该可以帮助您入门。
Pyspark to Hbase write
通过编译 git 存储库 https://github.com/hortonworks-spark/shc 并将 shc jar 放在 spark jar 文件夹中来解决它。 并按照库尔卡尼@Aniket提出的链接
最终代码看起来像这样,
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
properties = {
"instanceId" : "hbase",
"zookeepers" : "10-x-x-x.local:2181,10-x-x-x.local:2181,10-x-x-x.local:2181",
"hbase.columns.mapping" : "KEY_FIELD STRING :key, A STRING c:a, B STRING c:b",
"hbase.use.hbase.context" : False,
"hbase.config.resources" : "file:///etc/hbase/conf/hbase-site.xml",
"hbase.table" : "test_table"
}
spark = SparkSession.builder
.appName("hbaseWrite")
.getOrCreate()
sc = spark.sparkContext
catalog = ''.join("""{
"table":{"namespace":"default", "name":"test_table"}
"rowkey":"key",
"columns":{
"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
"A":{"cf":"c", "col":"a", "type":"string"},
"B":{"cf":"c", "col":"b", "type":"string"}
}
}""".split())
data = [("3","DATA 3 A", "DATA 3 B")]
columns = ['KEY_FIELD','A','B']
cSchema = StructType([StructField(columnName, StringType()) for columnName in columns])
df = spark.createDataFrame(data, schema=cSchema)
df.write
.options(catalog=catalog)
.options( **properties)
.mode('overwrite').format("org.apache.spark.sql.execution.datasources.hbase").save()