在许多MapReduce程序中,我看到一个reducer也被用作组合器。我知道这是因为这些项目的特殊性质。但我想知道他们是否会有所不同。
是的,组合器可以不同于Reducer,尽管您的组合器仍将实现Reducer接口。组合器只能在依赖于工作的特定情况下使用。组合器的操作方式类似于还原器,但仅适用于每个映射器输出的关键点/值的子集。
与Reducer不同,Combiner将具有的一个约束是输入/输出键和值类型必须与Mapper的输出类型匹配。
是的,它们肯定会有所不同,但我认为你不想使用不同的类,因为大多数情况下你会得到意想不到的结果。
组合器只能用于交换函数(a.b=b.a)和关联函数{a.(b.c)=(a.b).c}。这也意味着组合器可能只对键和值的子集进行操作,或者可能根本不执行,但您希望程序的输出保持不变。
选择具有不同逻辑的不同类可能不会给出逻辑输出。
以下是实现,您可以在没有组合器和有组合器的情况下运行,两者给出的答案完全相同。这里的减速器和组合器有不同的动机和不同的实现方式。
package combiner;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Map extends Mapper<LongWritable, Text, Text, Average> {
Text name = new Text();
String[] row;
protected void map(LongWritable offSet, Text line, Context context) throws IOException, InterruptedException {
row = line.toString().split(" ");
System.out.println("Key "+row[0]+"Value "+row[1]);
name.set(row[0]);
context.write(name, new Average(Integer.parseInt(row[1].toString()), 1));
}}
降低等级
public class Reduce extends Reducer<Text, Average, Text, LongWritable> {
LongWritable avg =new LongWritable();
protected void reduce(Text key, Iterable<Average> val, Context context)throws IOException, InterruptedException {
int total=0; int count=0; long avgg=0;
for (Average value : val){
total+=value.number*value.count;
count+=value.count;
avgg=total/count;
}
avg.set(avgg);
context.write(key, avg);
}
}
MapObject类
public class Average implements Writable {
long number;
int count;
public Average() {super();}
public Average(long number, int count) {
this.number = number;
this.count = count;
}
public long getNumber() {return number;}
public void setNumber(long number) {this.number = number;}
public int getCount() {return count;}
public void setCount(int count) {this.count = count;}
@Override
public void readFields(DataInput dataInput) throws IOException {
number = WritableUtils.readVLong(dataInput);
count = WritableUtils.readVInt(dataInput);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeVLong(dataOutput, number);
WritableUtils.writeVInt(dataOutput, count);
}
}
组合器类
public class Combine extends Reducer<Text, Average, Text, Average>{
protected void reduce(Text name, Iterable<Average> val, Context context)throws IOException, InterruptedException {
int total=0; int count=0; long avg=0;
for (Average value : val){
total+=value.number;
count+=1;
avg=total/count;
}
context.write(name, new Average(avg, count));
}
}
驾驶员等级
public class Driver1 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: SecondarySort <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "CustomCobiner");
job.setJarByClass(Driver1.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combine.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Average.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Git代码从这里
留下你的建议。。
组合器的主要目标是优化/最小化将在映射器和还原器之间的网络中进行混洗,从而尽可能节省带宽。
组合器的经验法则是它必须具有相同的输入和输出变量类型,原因是为此,是否不能保证组合器的使用,它可以使用还是不能使用,取决于音量以及泄漏次数。
当减速器满足这个规则,即输入和输出相同时,它可以用作组合器变量类型。
组合器的另一个最重要的规则是,它只能在您想要的函数时使用应用既是交换的又是结合的。比如加数字。但不是在像average这样的情况下(如果你使用和reducer相同的代码)。
现在回答你的问题,是的,当然它们可能会不同,当你的reducer有不同类型的输入和输出变量时,你别无选择,只能制作一个不同的reduce代码副本并修改它。
如果你关心reducer的逻辑,你也可以用不同的方式来实现,比如说,在组合器的情况下,你可以有一个集合对象来拥有一个本地缓冲区,其中包含所有进入组合器的值,这比在reducer中使用它的风险更小,因为在reduce器的情况中,它比在组合器中更容易耗尽内存。其他逻辑差异肯定存在,而且确实存在。