我想写一个mapreduce代码来计算给定CSV文件中的记录数。我不知道在地图上该做什么,在减少中该做什么。我应该如何解决这个问题?有人能提出建议吗?
- 每次读取记录,地图应发出1
- 组合器应该发出它得到的所有"1"的总和(每个映射的小计)
- 您的reducer应该发出记录的总数
映射程序必须发出一个固定键(只需使用值为"count"的Text),一个固定值为1的键(与wordcount示例中看到的相同)。
然后简单地使用LongSumReducer作为您的减速器。
你的工作输出将是一个键为"count"的记录,该值是你要查找的记录数。
您可以选择使用相同的LongSumReducer作为组合器来(显著地!)提高性能。
希望我有一个比公认答案更好的解决方案。
与其为每条记录发射1,为什么不在map()中增加一个计数器,并在cleanup()中的每个map任务之后发射递增的计数器呢。
可以减少中间读写操作。而reducer只需要聚合少数值的列表。
public class LineCntMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
Text keyEmit = new Text("Total Lines");
IntWritable valEmit = new IntWritable();
int partialSum = 0;
public void map(LongWritable key, Text value, Context context) {
partialSum++;
}
public void cleanup(Context context) {
valEmit.set(partialSum);
context.write(keyEmit, valEmit);
}
}
您可以在这里找到完整的工作代码
使用job.getcounter()检索作业完成后每个记录增加的值。如果您使用java编写mapreduce作业,则使用enum进行计数机制。
我只需要使用身份映射器和身份还原器。
这是Mapper.class和Reducer.class。然后只需读取map input records
你真的不需要做任何编码就可以得到这个。
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class LineCount
{
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text("Total Lines");
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,Reporter reporter)
throws IOException
{
output.collect(word, one);
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(LineCount.class);
conf.setJobName("LineCount");
conf.setNumReduceTasks(5);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}