我知道在这里问这个问题是非常愚蠢的。可能是我眼睛不好还是什么。我无法理解为什么我的减速器没有被调用,即使我已经在驱动程序类中配置了它。请帮助我确定我错过某些东西的确切位置。
我的驱动程序类
public class DPDriver {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Configuration config = new Configuration();
config.set("mapred.textoutputformat.seperator", "-->");
config.set("fs.file.impl", "com.debajit.assignment.WinLocalFileSystem");
String inputPath="In\input.txt";
Path inPath=new Path(inputPath);
String outputPath = "C:\output\run1";
Path outPath=new Path(outputPath);
Job job = new Job(config,"Tst run");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(DPMapper.class);
job.setReducerClass(DPReducer.class);
FileInputFormat.setInputPaths(job, inPath );
FileOutputFormat.setOutputPath(job, outPath);
System.out.println(job.waitForCompletion(true));
}
// enter code here
}
我的映射器类
package com.debajit.assignment;
public class DPMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
public void map(LongWritable key, Text vals, Context context)
throws IOException, InterruptedException{
System.out.println(" MAPPER CALLED");
String valString = vals.toString();
String tokens[] = valString.split("\s");
for(int i=0; i<tokens.length;i++){
System.out.println(" for loop "+i);
context.write(new Text(tokens[i]),new IntWritable(1));
}
}
}
我的减速器类
package com.debajit.assignment;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DPReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<Text>vals, Context context)
throws IOException, InterruptedException{
System.out.println(" REDUCER CALLD");
int count=0;
for(Text t: vals){
System.out.println("---- Text-------"+ t.toString());
}
context.write(key, new IntWritable(count));
}
}
您的化简器类定义为:
public class DPReducer extends Reducer<Text, IntWritable, Text, IntWritable>
所以减速器应该有Input: <Text,IntWritable>
和Output: <Text,IntWritable>
但是您已将减速器定义为:
public void reduce(Text key, Iterable<Text>vals, Context context)
期望Input: <Text, Iterable<Text>
与您的 Reduce 类扩展的内容不匹配。
这就是添加@override
批注时出现错误的原因。