如何使用TableMapReduceUtil在hbase扫描仪结果上运行mapreduce




我的hbase表看起来像这样:

    key---------value
    id1/bla     value1
    id1/blabla  value2
    id2/bla     value3
    id2/blabla  value4
    ....

有数百万个密钥以id1开头,也有数百万个关键字以id2开头。

我想用mapReduce从hbase读取数据,因为有很多密钥以相同的Id开头,每个Id一个映射还不够好。我更喜欢每个Id 100个映射器

我希望在同一个已按id筛选的scannerResult上运行多个映射程序。我阅读了TableMapReduceUtil,并尝试了以下操作:

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
    sourceTable,        // input table
    scan,               // Scan instance to control CF and attribute selection
    MyMapper.class,     // mapper class
    Text.class,         // mapper output key
    IntWritable.class,  // mapper output value
    job);


有了这样的映射函数(它应该迭代扫描仪结果(:

public static class MyMapper extends TableMapper<Text, IntWritable>  {
    private final IntWritable ONE = new IntWritable(1);
    private Text text = new Text();
    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
            text.set("123");     // we can only emit Writables...    
            context.write(text, ONE);
    }
}
<br>



我的问题是:

  1. 映射函数如何可能作为输入Result而不是ResultScanner?我知道扫描的结果可以由ResultScanner迭代,ResultScanner可以由result迭代。ResultScanner有Result的列表\数组,不是吗
  2. 如何在map函数中迭代扫描程序的结果
  3. 我该如何控制这个函数的拆分数量。如果它只打开10个映射器,而我想要20个。是否可以更改某些内容
  4. 有没有最简单的方法来实现我的目标

我将从您列表中的#4开始:

默认行为是为每个区域创建一个映射器。因此,与其试图破解TableInputFormat以根据您的规范创建自定义输入拆分,不如首先考虑将数据拆分为100个区域(然后您将拥有相当平衡的100个映射器(。

这种方法提高了您的读写性能,因为您不太容易受到热点攻击(假设集群中有一到两个以上的区域服务器(。

实现这一点的首选方法是预拆分表(即在创建表时定义拆分(。

相关内容

  • 没有找到相关文章

最新更新