编写用于计数记录数的MApreduce代码



我想写一个mapreduce代码来计算给定CSV文件中的记录数。我不知道在地图上该做什么,在减少中该做什么。我应该如何解决这个问题?有人能提出建议吗?

  • 每次读取记录,地图应发出1
  • 组合器应该发出它得到的所有"1"的总和(每个映射的小计)
  • 您的reducer应该发出记录的总数

映射程序必须发出一个固定键(只需使用值为"count"的Text),一个固定值为1的键(与wordcount示例中看到的相同)。

然后简单地使用LongSumReducer作为您的减速器。

你的工作输出将是一个键为"count"的记录,该值是你要查找的记录数。

您可以选择使用相同的LongSumReducer作为组合器来(显著地!)提高性能。

希望我有一个比公认答案更好的解决方案。

与其为每条记录发射1,为什么不在map()中增加一个计数器,并在cleanup()中的每个map任务之后发射递增的计数器呢。

可以减少中间读写操作。而reducer只需要聚合少数值的列表。

public class LineCntMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
 Text keyEmit = new Text("Total Lines");
 IntWritable valEmit = new IntWritable();
 int partialSum = 0;
 public void map(LongWritable key, Text value, Context context) {
  partialSum++;
 }
 public void cleanup(Context context) {
  valEmit.set(partialSum);
   context.write(keyEmit, valEmit);
 }
}

您可以在这里找到完整的工作代码

使用job.getcounter()检索作业完成后每个记录增加的值。如果您使用java编写mapreduce作业,则使用enum进行计数机制。

我只需要使用身份映射器和身份还原器。

这是Mapper.class和Reducer.class。然后只需读取map input records

你真的不需要做任何编码就可以得到这个。

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class LineCount 
{
    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable> 
{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text("Total Lines");
    public void map(LongWritable key, Text value,
            OutputCollector<Text, IntWritable> output,Reporter reporter)
            throws IOException 
    {
        output.collect(word, one);
    }
}
public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
}
public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(LineCount.class);
    conf.setJobName("LineCount");
    conf.setNumReduceTasks(5);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);
    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    JobClient.runJob(conf);
}
}

相关内容

  • 没有找到相关文章

最新更新