如何通过执行内部联接并将其引入配置单元来从 hbase 表中检索数据



我有两个 Hbase 表hbaseTablehbaseTable1 ' 和 Hive 表 ' hiveTable '我的查询如下所示:

'insert overwrite hiveTable select col1, h2.col2, col3 from hbaseTable h1,hbaseTable2 h2 where h1.col=h2.col2';

我需要在 hbase 中进行内部联接并将数据带到 hive。我们将 Hive 与 java 一起使用,它的性能非常差。因此,计划通过使用火花来改变方法。即,Spark with Java如何使用 SPARK 从我的 JAVA 代码连接到 hbase。

现在我的 Spark 代码应该在 HBase 中进行连接,并通过上述查询将数据引入 Hive。

请提供示例代码。

如果你使用 Spark 来加载 hbase 数据,那么为什么要在 hive 中加载它呢?你可以使用类似于hive的spark sql,因此可以使用sql。可以在不使用 hive 的情况下查询数据。例如:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import java.util.Arrays;
public class SparkHbaseHive {
    public static void main(String[] args) {
        Configuration conf = HBaseConfiguration.create();
        conf.set(TableInputFormat.INPUT_TABLE, "test");
        JavaSparkContext jsc = new JavaSparkContext(new SparkConf().setAppName("Spark-Hbase").setMaster("local[3]"));
        JavaPairRDD<ImmutableBytesWritable, Result> source = jsc
                .newAPIHadoopRDD(conf, TableInputFormat.class,
                        ImmutableBytesWritable.class, Result.class);
        SQLContext sqlContext = new SQLContext(jsc);
        JavaRDD<Table1Bean> rowJavaRDD = 
source.map((Function<Tuple2<ImmutableBytesWritable, Result>, Table1Bean>) object -> {
            Table1Bean table1Bean = new Table1Bean();
            table1Bean.setRowKey(Bytes.toString(object._1().get()));

table1Bean.setColumn1(Bytes.toString(object._2().getValue(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"))));
            return table1Bean;
    });
        DataFrame df = sqlContext.createDataFrame(rowJavaRDD, Table1Bean.class);
        //similarly create df2
        //use df.join() and then register as joinedtable or register two tables and join
        //execute sql queries
        //Example of sql query on df
        df.registerTempTable("table1");
        Arrays.stream(sqlContext.sql("select * from table1").collect()).forEach(row -> System.out.println(row.getString(0) + "," + row.getString(1)));
    }
}
public class Table1Bean {
    private String rowKey;
    private String column1;

    public String getRowKey() {
        return rowKey;
    }
    public void setRowKey(String rowKey) {
        this.rowKey = rowKey;
    }
    public String getColumn1() {
        return column1;
    }
    public void setColumn1(String column1) {
        this.column1 = column1;
    }
}

如果由于某些原因需要使用 Hive,请使用 HiveContext 从 Hive 读取并使用 saveAsTable 保留数据。如有疑问,请告诉我。

相关内容

  • 没有找到相关文章

最新更新