Spark Yarn 群集连接到 HBase 错误



我有一个应用程序可以解析 vcf 文件并将数据插入 hbase。该应用程序在使用 master local 时运行,使用 apache spark 没有问题,但是当我使用 apache spark 纱线集群运行它时,我遇到以下故障:

17/03/31 10:36:09 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:10 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:11 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:12 INFO yarn.Client: Application report for application_1490344846293_0020 (state: FINISHED)
17/03/31 10:36:12 INFO yarn.Client: 
     client token: N/A
     diagnostics: User class threw exception: java.lang.RuntimeException: org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the locations
     ApplicationMaster host: 192.168.0.14
     ApplicationMaster RPC port: 0
     queue: default
     start time: 1490956367991
     final status: FAILED
     tracking URL: http://master1:8088/proxy/application_1490344846293_0020/
     user: ubuntu
Exception in thread "main" org.apache.spark.SparkException: Application application_1490344846293_0020 finished with failed status
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1167)
    at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1213)
    at org.apache.spark.deploy.yarn.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/03/31 10:36:12 INFO util.ShutdownHookManager: Shutdown hook called
17/03/31 10:36:12 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-e6867ef3-fad4-424a-b6d3-f79f48bd65ea

我使用以下代码连接到 hbase:

    package com.mycompany.app;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.spark.api.java.function.VoidFunction;
    import java.io.IOException;
    class HbaseAppender implements VoidFunction<Put> {

        private final String TABLE_NAME = "data";
        private final String COLUMN_FAMILY_NAME = "v_data";

        static private HTable hTable;

        public HbaseAppender(){
            init();
        }
        //method used to establish connection to Hbase
        private void init(){
        TableName tableName;
        Configuration hconf = HBaseConfiguration.create();
        hconf.set("hbase.zookeeper.property.clientPort", "2181");
        hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
        hconf.set("hbase.zookeeper.quorum", "master1");
            try {
                Connection connection = ConnectionFactory.createConnection();
                Admin admin = connection.getAdmin();
                tableName = TableName.valueOf(TABLE_NAME);
                if(!admin.tableExists(tableName)) {
                    HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(COLUMN_FAMILY_NAME);
                    HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
                    hTableDescriptor.addFamily(hColumnDescriptor);
                    admin.createTable(hTableDescriptor);
                }
                this.hTable = new HTable(tableName,connection);
           }
            catch (IOException e){
                throw new RuntimeException(e);
            }
        }
        @Override
        public void call(Put put) throws Exception {
            this.hTable.put(put);
        }
    }

如果我们在集群上运行并使用一个连接,它将不起作用,因为连接无法发送到每个节点(它不可序列化(,并且我们无法为 rdd 的每个元素创建连接。因此,解决方案是使用saveAsNewAPIHadoopDataset,它可以为集群的每个节点创建一个连接,并将rdd的所有元素保存到hbase(或hdfs,取决于配置(。

最新更新