r语言 - 如何在 hadoop 集群中将数据从 SparkR 插入到 Hbase



我正在寻找有关将SparkR数据直接加载到HBase中的帮助。 读取函数正在工作,我能够使用 SparkR (sparkR.session( 从 Hive 外部表中读取数据

执行的步骤:

  • 已创建 HBase 表 (hbase_test1(
  • 在 Hive 中创建了一个外部表以映射 Hive 中的 HBase 表(test1(

法典:

library(SparkR)
sc <- sparkR.session(master = "local",sparkEnvir = list(spark.driver.memory="2g",enableHiveSupport=TRUE))
sqlContext <- sparkR.session(sc)
df <- sql("show tables")
collect(df)
sdf <- sql("SELECT * from test1")

这就是我的立场。

是否可以直接从 SparkR 将数据直接写入 HBase? 仅供参考:我需要将SparkR用于某些ML代码。结果需要保存回HBase。请注意,我正在使用所有开源工具。

无需额外部署,即可使用 ApacheSpark - Apache HBase Connector。

首先,您必须包含包。可以使用以下选项完成此操作*

spark.jars.packages  com.hortonworks:shc-core:1.1.1-2.1-s_2.11
spark.jars.repositories http://repo.hortonworks.com/content/groups/public/

在您的spark-defaults.conf或等效的命令行参数中用于spark-submit/SparkR

--packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 
--repositories http://repo.hortonworks.com/content/groups/public/ 

软件包的版本(上面s_2.11(必须与用于构建Spark的Scala版本相匹配。

现在假设您将表定义为

create 'FooBar', 'Foo', 'Bar'

并且您希望 SparkR 插入等效于:

put 'FooBar', '1000', 'Foo:Value', 'x1'
put 'FooBar', '1000', 'Bar:Value', 'y1'
put 'FooBar', '2000', 'Foo:Value', 'x2'
put 'FooBar', '2000', 'Bar:Value', 'y2'

您必须提供目录映射:

catalog = '{
"table":{"namespace":"default", "name":"FooBar"},
"rowkey":"key",
"columns":{
"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
"foo_value":{"cf":"Foo", "col":"Value", "type":"string"},
"bar_value":{"cf":"Bar", "col":"Value", "type":"string"}
}
}'

和输入表:

df <- createDataFrame(data.frame(
rowkey = c("1000", "2000"), foo_value = c("x1", "x2"), bar_value = c("y1", "y2")
))

最后,您可以使用以下选项应用write.ml

write.df(df, 
source = "org.apache.spark.sql.execution.datasources.hbase", 
mode = "append", catalog = catalog)

有关详细信息,请参阅官方连接器文档。

如果您不介意其他依赖项,则可以部署 Apache Phoenix,映射 HBase 表(例如检查 PHOENIX-447(,然后使用官方连接器或内置 JDBC 源代码来写入数据。

在额外的成本下,它将提供更好的用户体验。例如,如果将凤凰表定义为:

CREATE TABLE foobar (
id VARCHAR NOT NULL PRIMARY KEY, 
foo INTEGER, 
bar VARCHAR
); 

你可以

SparkR:::callJStatic(
"java.lang.Class", "forName",  
"org.apache.phoenix.jdbc.PhoenixDriver"
)

df <- createDataFrame(data.frame(
id = c("1000", "2000"), foo = c(1, 2), bar = c("x", "y")
))

write.df(
dfr, source = "org.apache.phoenix.spark", 
# Note that the only supported mode is `overwrite`, 
# which in fact works like `UPSERT`
mode = "overwrite",
table = "FooBar", 
# ZooKeeper URL
zkUrl = "host:port"  
)

与第一个选项类似,您必须包含相应的连接器。但是,与HBase连接器不同,它不能自给自足,并且需要CLASSPATH上的凤凰核心和客户端jars。


*不要忘记在将来调整软件包版本。

最新更新