在条件下停止Hadoop中的Reduce功能



我有一个reduce函数,我想在处理一些'n'键后停止reduce函数。我已经设置了一个计数器来对每个键进行递增,并且在满足条件时从reduce函数返回。

代码

    public class wordcount {
public static class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        private IntWritable leng=new IntWritable();
 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
         while (tokenizer.hasMoreTokens()) {
                String lword=tokenizer.nextToken(); 
            leng.set(lword.length());
             context.write(leng, one);
             }
        }
    } 
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
    int count=0;
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) 
         throws IOException, InterruptedException {
         int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
        count++;
         }
        context.write(key, new IntWritable(sum));
        if(count>19) return;
            }
    }

还有其他方法吗

您可以通过重写Reducer类(新API)的run()来实现这一点

public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  //reduce method here
  // Override the run()
  @override
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    int count = 0;
    while (context.nextKey()) {
        if (count++ < n) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        } else {
            // exit or do whatever you want
        }
    }
    cleanup(context);
  }
}

最新更新