我希望我的第一个归约任务产生类似(课程,<总和,计数>(; 在第二个归约任务中,我将计算每门课程的总和/计数。 第一个化简器任务充当合并器、求和和计数;第二个归约任务查找每门课程的平均值和输出平均值。我只是找不到将输出值存储为密钥对然后能够检索和计算它们的最佳类型。哈希图不起作用。
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class AvgGrading {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "avg grading");
job.setJarByClass(AvgGrading.class);
job.setMapperClass(MapForAverage.class);
job.setCombinerClass(ReduceForAverage.class);
job.setNumReduceTasks(2);
job.setReducerClass(ReduceForFinal.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Object.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class MapForAverage extends Mapper<LongWritable, Text, LongWritable, Object> {
public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {
String [] word = value.toString().split(", ");
float grade = Integer.parseInt(word[1]);
int course = Integer.parseInt(word[0]);
Map <Float,Long> m = new HashMap<Float,Long>();
m.put(grade, (long) 1);
con.write(new LongWritable(course), m);
}
}
public static class ReduceForAverage extends Reducer<LongWritable, Object, LongWritable, Object> {
private FloatWritable result = new FloatWritable();
public void reduce(LongWritable course, Map<Float,Long> values, Context con)
throws IOException, InterruptedException {
Map <Float,Long> m = new HashMap<Float,Long>();
float sum = 0;
long count =0;
for (Map.Entry<Float, Long> entry : values.entrySet()) {
sum += entry.getKey();
count++;
}
m.put(sum, count);
con.write(course, m);
}
}
public static class ReduceForFinal extends Reducer<LongWritable, Object, LongWritable, FloatWritable> {
private FloatWritable result = new FloatWritable();
public void reduce(LongWritable course, Map<Long,Float>values, Context con)
throws IOException, InterruptedException {
long key = 0;
float value=0;
for ( Map.Entry<Long, Float> entry : values.entrySet()) {
key = entry.getKey();
value = entry.getValue();
}
float res= key/value;
con.write(course, new FloatWritable(res));
}
}
}
请注意,我无法在Reduce任务中循环访问Iterable < Map<Float,Int>>
,因此我以简单的Map传递,这可能不正确。
错误代码为:
Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.NullPointerException
第二个减速器故障
Map没有实现 Writable,你说你的组合器和化简器输入值的类是 Object,而你正在发出 Map。为此,您只需要创建一个自定义类。请记住,如果你想在Hadoop中发出一些东西,自定义类必须实现Writable。这是您可以执行的操作:
public class Counter implements Writable {
private float sum;
private long count;
public Counter(float sum, long count){
this.sum = sum;
this.count = count;
}
/* Methods to get and set private variables of the class */
public float getSum() {
return sum;
}
public void setSum(float sumValue) {
sum=sumValue;
}
public long getCount() {
return count;
}
public void setCount(long countValue) {
count=countValue;
}
/* Methods to serialize and deserialize the contents of the
instances of this class */
@Override /* Serialize the fields of this object to out */
public void write(DataOutput out) throws IOException{
out.writeFloat(sum);
out.writeLong(count);
}
@Override /* Deserialize the fields of this object from in */
public void readFields(DataInputin) throws IOException{
sum=in.readFloat();
count=in.readLong();
}
}
因此,在您的第一个映射器中,您可以通过以下方式创建和发出计数器:
Counter counter = new Counter(grade, 1);
con.write(course, counter);
此时,在第一个化简器中,您将拥有一个表示课程的键和一个可迭代值,该值是所有计数器的可迭代对象,使用此可迭代对象,您可以计算平均值。请记住更新映射器一个化简器类参数,使其与新的参数一致。