这是我的Map
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] fields = value.toString().split(",", -20);
String country = fields[4];
String numClaims = fields[8];
if (numClaims.length() > 0 && !numClaims.startsWith(""")) {
context.write(new Text(country), new Text(numClaims + ",1"));
}
}
}
这是我的Reduce
public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0.0;
int count = 0;
while (values.hasNext()) {
String[] fields = values.next().toString().split(",");
sum += Double.parseDouble(fields[0]);
count += Integer.parseInt(fields[1]);
}
context.write(new Text(key), new DoubleWritable(sum/count));
}
以下是如何配置
Job job = new Job(getConf());
job.setJarByClass(AverageByAttributeUsingCombiner.class);
job.setJobName("AverageByAttributeUsingCombiner");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MapClass.class);
// job.setCombinerClass(Combinber.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// job.setNumReduceTasks(0); // to not run the reducer
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
输入的形式为
"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD│
","SECDLWBD" │
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,, │
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,, │
3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,, │
3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,
整个map reduce
的输出看起来像
"AR"5,1│
"AR"9.1│
"AR"2,1│
"AR"15,1│
"AR"13,1│
"AR"1,1│
"AR"34,1│
"AR"12,1│
"AR"8,1│
"AR"7,1│
"AR"23,1│
"AR"3,1│
"AR"4,1│
"AR"4,1
如何调试和修复此问题?我正在学习hadoop
如前所述,问题在于您没有覆盖默认抽象Reducer类的默认reduce方法。
更具体地说,到目前为止的问题是,您的reduce方法签名是:
public void reduce(Text key, **Iterator**<Text> values, Context context)
throws IOException, InterruptedException
相反,它应该是:
public void reduce(Text key, **Iterable**<Text> values, Context context)
throws IOException, InterruptedException
旧的API版本是正确的,您实现了Reducer接口reduce()
方法,并且它有效。
对于这种情况,一个很好的验证是使用@Override
,因为它强制编译时检查签名不匹配。
您的减速器没有"捕获"。可能存在类型不匹配或类似情况,因此reduce
函数与它从中继承的抽象接口不匹配。。。所以它不是压倒一切的。默认情况下,reduce
将使用IdentityReducer
,它什么也不做(这就是您正在经历的)。
为了确保您实际上正在重写,请添加@override
:
@override
public void reduce(Text key, Iterator<Text> values, Context context)
这将引发一个错误,因为函数签名不匹配。这有望帮助您诊断问题。
- 我目前正在使用
hadoop-core-1.0.3.jar
,并尝试使用新的API编写Map Reduce
,不确定为什么它不起作用 - 这个程序是Hadoop-in-Action代码的一部分,我正在用这本书学习Hadoop
- 当我用
old API syntax
运行相同的map reduce
程序时,它运行得非常好 - 代码看起来像(连同包含的Combiner,我在Combiner第一次出现之前就对它进行了测试)
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Iterator; public class AveragingWithCombiner extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { static enum ClaimsCounters { MISSING, QUOTED }; public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String fields[] = value.toString().split(",", -20); String country = fields[4]; String numClaims = fields[8]; if (numClaims.length() > 0 && !numClaims.startsWith(""")) { output.collect(new Text(country), new Text(numClaims + ",1")); } } } public static class Combine extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { double sum = 0; int count = 0; while (values.hasNext()) { String fields[] = values.next().toString().split(","); sum += Double.parseDouble(fields[0]); count += Integer.parseInt(fields[1]); } output.collect(key, new Text(sum + "," + count)); } } public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, DoubleWritable> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException { double sum = 0; int count = 0; while (values.hasNext()) { String fields[] = values.next().toString().split(","); sum += Double.parseDouble(fields[0]); count += Integer.parseInt(fields[1]); } output.collect(key, new DoubleWritable(sum/count)); } } public int run(String[] args) throws Exception { // Configuration processed by ToolRunner Configuration conf = getConf(); // Create a JobConf using the processed conf JobConf job = new JobConf(conf, AveragingWithCombiner.class); // Process custom command-line options Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); // Specify various job-specific parameters job.setJobName("AveragingWithCombiner"); job.setMapperClass(MapClass.class); job.setCombinerClass(Combine.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // Submit the job, then poll for progress until the job is complete JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { // Let ToolRunner handle generic command-line options int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args); System.exit(res); } }