化简器多次接收相同的值,而不是预期的输入

  • 本文关键字: hadoop mapreduce
  • 更新时间 :
  • 英文 :


在我的本地Hadoop环境中编写map-reduce作业时,我遇到了Reducer没有收到我期望的值的问题。我将问题抽象为以下内容:

我创建了一个包含 10 行的任意输入文件,以使 map 方法执行 10 次。在映射器中,我创建一个调用计数,并将该计数作为值写入输出,如果值为偶数,则以 0 作为键,如果值为奇数,则为 1 作为键,即以下(键、值)对:

(1,1)、(0,2)、(1,3)、(

0,4)、(1,5)等

我希望收到两个给减速器的电话

  • 0> [2,4,6,8,10]
  • 1> [1,3,5,7,9]

但我接到两个电话

  • 0> [2,2,2,2,2]
  • 1> [1,1,1,1,1]

相反。似乎我收到了在映射器中写入的第一个值,其中包含键的多重性(如果我反转计数器,我会收到值 10 和 9 而不是 2 和 1)。据我了解,这不是预期的行为(?),但我无法弄清楚我做错了什么。

我使用以下映射器和化简器:

public class TestMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
    int count = 0;
    @Override
    protected void map(LongWritable keyUnused, Text valueUnused, Context context) throws IOException, InterruptedException {
        count += 1;
        context.write(new IntWritable(count % 2), new IntWritable(count));
        System.err.println((count % 2) + "|" + count);
    }
}
public class TestReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> valueItr, Context context) throws IOException, InterruptedException {
        List<IntWritable> values = Lists.newArrayList(valueItr);
        System.err.println(key + "|" + values);
    }
}

我使用本地测试运行器运行Hadoop作业,例如在"Hadoop:权威指南"(O'Reilly)一书中描述的那样:

public class TestDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output>n",
                    getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }
        Job jobConf = Job.getInstance(getConf());
        jobConf.setJarByClass(getClass());
        jobConf.setJobName("TestJob");  
        jobConf.setMapperClass(TestMapper.class);
        jobConf.setReducerClass(TestReducer.class);
        FileInputFormat.addInputPath(jobConf, new Path(args[0]));
        FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
        jobConf.setOutputKeyClass(IntWritable.class);
        jobConf.setOutputValueClass(IntWritable.class);
        return jobConf.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new TestDriver(), args));
}

打包在一个罐子里,用'Hadoop jar test.jar infile.txt/tmp/testout'运行。

Hadoop在流式传输reducer值时重用值对象。

因此,为了捕获所有不同的值,您需要复制:

@Override
protected void reduce(IntWritable key, Iterable<IntWritable> valueItr, Context context) throws  IOException, InterruptedException {        
    List<IntWritable> values = Lists.newArrayList();
    for(IntWritable writable : valueItr) {
        values.add(new IntWritable(writable.get());
    }
    System.err.println(key + "|" + values);
}

相关内容

  • 没有找到相关文章

最新更新