我运行了一个简单的wordcount MapReduce示例,添加了一个组合器,在组合器输出中有一个小的变化,组合器的输出不被reducer合并。场景如下
测试:Map ->组合器->减速器
在组合器中,我添加了两行额外的行来输出一个不同的单词并计数1,reducer不计算"不同"的单词计数。输出粘贴在下面。
Text = new Text("different");//添加了我自己的输出
上下文。write(t, new IntWritable(1));//添加了我自己的输出
public class wordcountcombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
context.write(key, new IntWritable(sum));
Text t = new Text("different"); // Added my own output
context.write(t, new IntWritable(1)); // Added my own output
}
}
输入:输出:我运行了一个简单的wordcount MapReduce示例,添加了一个组合器,在组合器输出中有一个小的变化,组合器的输出不被reducer合并。场景如下在组合器中,我添加了两个额外的行来输出不同的单词并计数1,减速器不计算"不同"的单词计数。输出粘贴在下面。
"different" 1
different 1
different 1
I 2
different 1
In 1
different 1
MapReduce 1
different 1
The 1
different 1
...
怎么会这样?
fullcode:我用组合器运行单词计数程序,只是为了好玩,我在组合器中调整了一下,所以我遇到了这个问题。我有三个独立的类映射器,组合器和减速器。
司机:
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Job job = Job.getInstance(new Configuration());
job.setJarByClass(wordcountmapper.class);
job.setJobName("Word Count");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(wordcountmapper.class);
job.setCombinerClass(wordcountcombiner.class);
job.setReducerClass(wordcountreducer.class);
job.getConfiguration().set("fs.file.impl", "com.conga.services.hadoop.patch.HADOOP_7682.WinLocalFileSystem");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
映射器:
public class wordcountmapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
IntWritable one = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens())
{
word.set(token.nextToken());
context.write(word, one);
}
}
}
组合器:
public class wordcountcombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
context.write(key, new IntWritable(sum));
Text t = new Text("different");
context.write(t, new IntWritable(1));
}
}
减速器:
public class wordcountreducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
输出是正常的,因为你有两行做错了:为什么要写这个代码
Text t = new Text("different"); // Added my own output
context.write(t, new IntWritable(1)); // Added my own output
在你的减速器中,你在做求和,然后你在输出中添加不同的1 ....
您在作业的最终输出中编写了reduce
函数中的新"1 different"
,而没有进行任何类型的聚合。reduce
函数每个键被调用一次,正如你在方法签名中看到的,它接受一个键和该键的值列表作为参数,这意味着它对每个键被调用一次。
由于您使用关键字,并且在reduce
的每个调用中您都写入输出"1 different"
,因此您将为输入数据中的每个单词获得其中一个。
hadoop要求组合器中的reduce方法只写入它接收到的作为输入的相同键。这是必需的,因为hadoop只在调用组合器之前对键进行排序,它不会在组合器运行后对它们重新排序。在您的程序中,除了作为输入接收到的键之外,reduce方法还写入键"different"。这意味着键"different"随后在键的不同顺序中出现在不同的位置,并且在它们被传递给reducer之前不会合并这些出现。
例如:假设映射器输出的键的排序列表为:"alpha", "beta", "gamma"
您的组合器然后被调用三次(一次用于"alpha"
,一次用于"beta"
,一次用于"gamma"
),并产生密钥"alpha", "different"
,然后密钥"beta", "different"
,然后密钥"gamma", "different"
。
在组合器执行后,"sorted"
(但实际上没有排序)键列表为:
"alpha", "different", "beta", "different", "gamma", "different"
这个列表不会再次排序,所以不同出现的"different"不会被合并。
分别调用减速器6次,键"different"在减速器输出中出现3次