使用MultipleOutputs在MapReduce中写入HBase



我目前有一个MapReduce作业,它使用MultipleOutputs将数据发送到几个HDFS位置。 完成后,我使用 HBase 客户端调用(在 MR 之外)将一些相同的元素添加到几个 HBase 表中。 最好使用 TableOutputFormat 将 HBase 输出添加为额外的多个输出。 这样,我将分发我的HBase处理。

问题是,我无法让它工作。 有没有人在MultipleOutputs中使用过TableOutputFormat...? 具有多个 HBase 输出?

基本上,我正在设置我的收藏家,就像这样......

Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector1 = multipleOutputs.getCollector("hbase1", reporter); 
Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector2 = multipleOutputs.getCollector("hbase2", reporter); 
Put put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata1);
hbaseCollector1.collect(NullWritable.get(), put);
put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata2);
hbaseCollector2.collect(newImmutableBytesWritable(mykey.getBytes()), put);

我认为,这似乎遵循了hbase写作的一般思路。

当我键入此内容时,部分问题可能更多地出现在作业定义中。 看起来 MR(和 Hbase)想要一个全局参数集,就像这样......

conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");

以提供表名。 麻烦的是,我有两张桌子。

有什么想法吗?

谢谢

我已经以

3 种不同的方式将数据放入 HBase 中。最有效(和分布式)的是使用HFileOutputFormat类。

我按如下方式设置了工作...(请注意,这是根据实际代码编辑的,但核心内容仍然存在)

cubeBuilderETLJob.setJobName(jobName);
cubeBuilderETLJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
cubeBuilderETLJob.setMapOutputValueClass(Put.class);
cubeBuilderETLJob.setMapperClass(HiveToHBaseMapper.class);      
cubeBuilderETLJob.setJarByClass(CubeBuilderDriver.class);       
cubeBuilderETLJob.setInputFormatClass(TextInputFormat.class);
cubeBuilderETLJob.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.setOutputPath(cubeBuilderETLJob, cubeOutputPath);
HTable hTable = null;
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("ZOOKEEPER_QUORUM", hbaseZookeeperQuorum);
hConf.set("ZOOKEEPER_CLIENTPORT", hbaseZookeeperClientPort);
hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(cubeBuilderETLJob, hTable);

正如我们所看到的,我的映射器类被称为HiveToHBaseMapper - 漂亮而原始。 :)这是它的(再次,粗略的)定义

public class HiveToHBaseMapper extends
    Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {
@Override
public void map(WritableComparable key, Writable val, Context context)
    throws IOException, InterruptedException {
    Configuration config = context.getConfiguration();
    String family = config.get("FAMILY");
    Double value = Double.parseDouble(sValue);
    String sKey = generateKey(config);
    byte[] bKey = Bytes.toBytes(sKey);
    Put put = new Put(bKey);
    put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
        ? Bytes.toBytes(Double.MIN_VALUE)
        : Bytes.toBytes(value));        
    ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
    context.write(ibKey, put);
}

我不知道您是否可以使用它来将其放入MultipleOutputs中,或者需要创建一个新的MR工作。这是我遇到的将数据输入HBase的最佳方式。:)

这有望让您朝着正确的方向找到解决方案。

根据我的经验,最好的方法是在数据可用时立即将数据放入 hbase 表中(除非您正在批量加载数据)。 如果您的映射任务中有可用的数据,这是将其推送到 hbase 的最佳时机。 如果在 reduce 任务之前没有数据,则将推送添加到 hbase 那里。 在知道 HBase 是瓶颈之前,请让 HBase 担心缓存问题。

因此,显然,这在旧的mapred软件包中是不可能的。 mapreduce软件包集中有一个新的OutputFormat,但我现在不想转换为它。 因此,我将不得不编写多个 MR 作业。

最新更新