Reducer setup()的Mapper是用来做什么的?



设置和清理方法究竟用于什么?我试图找出它们的含义,但还没有人确切地描述它们的作用。例如,setup方法如何使用来自输入分割的数据?它是一个整体吗?还是一行一行的?

如前所述,setup()cleanup()是您可以覆盖的方法,如果您选择的话,它们在那里供您初始化和清理map/reduce任务。实际上,在这些阶段中,您不能直接访问来自输入分割的任何数据。map/reduce任务的生命周期是(从程序员的角度来看):

setup -> map -> cleanup

setup -> reduce -> cleanup

通常在setup()期间发生的情况是,您可以从配置对象中读取参数以自定义处理逻辑。

cleanup()期间通常发生的情况是,您清理可能已经分配的任何资源。还有其他用途,那就是清除所有累积的聚合结果。

setup()cleanup()方法只是你的"钩子",开发者/程序员,有机会在你的map/reduce任务之前和之后做一些事情。

例如,在规范的单词计数示例中,假设您想要从计数中排除某些单词(例如,停止词,如"the","a","be"等)。在配置MapReduce Job时,可以将这些单词的列表(以逗号分隔)作为参数(键值对)传递给配置对象。然后在您的地图代码中,在setup()期间,您可以获取停止词并将它们存储在某个全局变量中(映射任务的全局变量),并在映射逻辑期间排除对这些词的计数。下面是一个修改后的示例,来自http://wiki.apache.org/hadoop/WordCount。

public class WordCount {
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private Set<String> stopWords;
    protected void setup(Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        stopWords = new HashSet<String>();
        for(String word : conf.get("stop.words").split(",")) {
            stopWords.add(word);
        }
    }
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            String token = tokenizer.nextToken();
            if(stopWords.contains(token)) {
                continue;
            }
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 
 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }
 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("stop.words", "the, a, an, be, but, can");
    Job job = new Job(conf, "wordcount");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.waitForCompletion(true);
 }
}

setupcleanup对每个任务调用一次。
例如,你有5个映射器在运行,对于每个映射器你想初始化一些值,那么你可以使用setup。你的设置方法被调用了5次。
因此,对于每个mapreduce首先调用setup()方法,然后调用map()/reduce()方法,然后在退出任务之前调用cleanup()方法。

setup: Called once at the beginning of the task.

你可以在这里自定义初始化。

cleanup: Called once at the end of the task.

你可以把资源释放放在这里

相关内容

  • 没有找到相关文章

最新更新