SaveAsHadoopDataset永远不会关闭与zookeeper的连接



我正在使用下面的代码写入hbase

    jsonDStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
        @Override
        public Void call(JavaRDD<String> rdd) throws Exception {
            DataFrame jsonFrame = sqlContext.jsonRDD(rdd);
            DataFrame selecteFieldFrame = jsonFrame.select("id_str","created_at","text");
            Configuration config = HBaseConfiguration.create();
            config.set("hbase.zookeeper.quorum", "d-9543");
            config.set("zookeeper.znode.parent","/hbase-unsecure");
            config.set("hbase.zookeeper.property.clientPort", "2181");
            final JobConf jobConfig=new JobConf(config,SveAsHadoopDataSetExample.class);
            jobConfig.setOutputFormat(TableOutputFormat.class);
            jobConfig.set(TableOutputFormat.OUTPUT_TABLE,"tableName");
             selecteFieldFrame.javaRDD().mapToPair(new PairFunction<Row, ImmutableBytesWritable, Put>() {
                @Override
                public Tuple2<ImmutableBytesWritable, Put> call(Row row) throws Exception {
                    // TODO Auto-generated method stub
                    return convertToPut(row);
                }
            }).saveAsHadoopDataset(jobConfig);

            return null;
        }
    });

但是当我在动物园管理员中看到 zkDump 时,连接不断增加

任何建议/指示都将有很大帮助!

我有同样的问题,这是一个hbase错误,我修复了它:

change org.apache.hadoop.hbase.mapred.TableOutputFormat

to org.apache.hadoop.hbase.mapreduce.TableOutputFormat, 并使用org.apache.hadoop.mapreduce.Job,而不是org.apache.hadoop.mapred.JobConf

这是一个示例:

import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", zk_hosts)
conf.set("hbase.zookeeper.property.clientPort", zk_port)
conf.set(TableOutputFormat.OUTPUT_TABLE, "TABLE_NAME")
val job = Job.getInstance(conf)
job.setOutputFormatClass(classOf[TableOutputFormat[String]])
formatedLines.map{
  case (a,b, c) => {
    val row = Bytes.toBytes(a)
    val put = new Put(row)
    put.setDurability(Durability.SKIP_WAL)
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("node"), Bytes.toBytes(b))
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("topic"), Bytes.toBytes(c))
    (new ImmutableBytesWritable(row), put)
  }
}.saveAsNewAPIHadoopDataset(job.getConfiguration)

这可能会对您有所帮助!

https://github.com/hortonworks-spark/shc/pull/20/commits/2074067c42c5a454fa4cdeec18c462b5367f23b9

相关内容

  • 没有找到相关文章

最新更新