我有两个 Hbase 表hbaseTable
、hbaseTable1
' 和 Hive 表 ' hiveTable
'我的查询如下所示:
'insert overwrite hiveTable select col1, h2.col2, col3 from hbaseTable h1,hbaseTable2 h2 where h1.col=h2.col2';
我需要在 hbase 中进行内部联接并将数据带到 hive。我们将 Hive 与 java 一起使用,它的性能非常差。因此,计划通过使用火花来改变方法。即,Spark with Java如何使用 SPARK 从我的 JAVA 代码连接到 hbase。
现在我的 Spark 代码应该在 HBase 中进行连接,并通过上述查询将数据引入 Hive。
请提供示例代码。
如果你使用 Spark 来加载 hbase 数据,那么为什么要在 hive 中加载它呢?你可以使用类似于hive的spark sql,因此可以使用sql。可以在不使用 hive 的情况下查询数据。例如:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import java.util.Arrays;
public class SparkHbaseHive {
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "test");
JavaSparkContext jsc = new JavaSparkContext(new SparkConf().setAppName("Spark-Hbase").setMaster("local[3]"));
JavaPairRDD<ImmutableBytesWritable, Result> source = jsc
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
SQLContext sqlContext = new SQLContext(jsc);
JavaRDD<Table1Bean> rowJavaRDD =
source.map((Function<Tuple2<ImmutableBytesWritable, Result>, Table1Bean>) object -> {
Table1Bean table1Bean = new Table1Bean();
table1Bean.setRowKey(Bytes.toString(object._1().get()));
table1Bean.setColumn1(Bytes.toString(object._2().getValue(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"))));
return table1Bean;
});
DataFrame df = sqlContext.createDataFrame(rowJavaRDD, Table1Bean.class);
//similarly create df2
//use df.join() and then register as joinedtable or register two tables and join
//execute sql queries
//Example of sql query on df
df.registerTempTable("table1");
Arrays.stream(sqlContext.sql("select * from table1").collect()).forEach(row -> System.out.println(row.getString(0) + "," + row.getString(1)));
}
}
public class Table1Bean {
private String rowKey;
private String column1;
public String getRowKey() {
return rowKey;
}
public void setRowKey(String rowKey) {
this.rowKey = rowKey;
}
public String getColumn1() {
return column1;
}
public void setColumn1(String column1) {
this.column1 = column1;
}
}
如果由于某些原因需要使用 Hive,请使用 HiveContext 从 Hive 读取并使用 saveAsTable 保留数据。如有疑问,请告诉我。