如何在flink流作业中读写HBase



如果我们必须在流应用程序中读写HBASE,我们怎么做呢?我们通过open方法打开一个写连接,那么我们如何打开一个读连接呢?

object test {
    if (args.length != 11) {
      //print args
      System.exit(1)
    }
    val Array() = args
    println("Parameters Passed " + ...);
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", metadataBrokerList)
    properties.setProperty("zookeeper.connect", zkQuorum)
    properties.setProperty("group.id", group)

    val messageStream = env.addSource(new FlinkKafkaConsumer08[String](topics, new SimpleStringSchema(), properties))
    messageStream.map { x => getheader(x) }


    def getheader(a: String) {
        //Get header and parse and split the headers
                if (metadata not available hit HBASE) { //Device Level send(Just JSON)
            //How to read from HBASE here .
                      } 
                      //If the resultset is not available in Map fetch from phoenix
                      else {
                          //fetch from cache
                      }
     }


    }
   messageStream.writeUsingOutputFormat(new HBaseOutputFormat());
   env.execute()
}

现在在方法getheader中,如果我想从if(metadata not available hit HBASE)中读取HBASE,我该怎么做呢?我不想在这里打开连接,我的想法是为一个线程维护一个连接并重用它,就像flink对HBASE的sink使用open()方法或spark对foreachpartition的处理一样。我试过这个,但我不能传递StreamExecutionEnvironment方法。我怎么能做到这一点,有人能提供一个片段吗?

您希望从流用户函数中读取/写入Apache HBase。您链接的hbasereadeexample正在做一些不同的事情:它将HBase表读入DataSet (Flink的批处理抽象)。在用户函数中使用此代码意味着从Flink程序中启动Flink程序。

对于您的用例,您需要在您的user函数中直接创建一个HBase客户端并与之交互。最好的方法是使用RichFlatMapFunction并在open()方法中创建到HBase的连接。

下一个版本的Flink(1.2.0)将支持用户函数中的异步I/O操作,这将显著提高应用程序的吞吐量。

相关内容

  • 没有找到相关文章

最新更新