在下面的代码中,reducer类内部的reduce方法没有执行。请帮帮我。在我的reduce方法中,我想在多个文件中写入输出。所以我使用了多路输出。
public class DataValidation {
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
int flag = 1;
boolean result;
private HashMap<String, FileConfig> fileConfigMaps = new HashMap<String, FileConfig>();
private HashMap<String, List<LineValidator>> mapOfValidators = new HashMap<String, List<LineValidator>>();
private HashMap<String, List<Processor>> mapOfProcessors = new HashMap<String, List<Processor>>();
protected void setup(Context context) throws IOException {
System.out.println("configure inside map class");
ConfigurationParser parser = new ConfigurationParser();
Config config = parser.parse(new Configuration());
List<FileConfig> file = config.getFiles();
for (FileConfig f : file) {
try {
fileConfigMaps.put(f.getName(), f);
System.out.println("quotes in" + f.isQuotes());
System.out.println("file from xml : " + f.getName());
ValidationBuilder builder = new ValidationBuilder();
// ProcessorBuilder constructor = new ProcessorBuilder();
List<LineValidator> validators;
validators = builder.build(f);
// List<Processor> processors = constructor.build(f);
mapOfValidators.put(f.getName(), validators);
// mapOfProcessors.put(f.getName(),processors);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// String filename = ((FileSplit) context.getInputSplit()).getPath()
// .getName();
FileSplit fs = (FileSplit) context.getInputSplit();
String fileName = fs.getPath().getName();
System.out.println("filename : " + fileName);
String line = value.toString();
String[] csvDataArray = null;
List<LineValidator> lvs = mapOfValidators.get(fileName);
flag = 1;
csvDataArray = line.split(",", -1);
FileConfig fc = fileConfigMaps.get(fileName);
System.out.println("filename inside fileconfig " + fc.getName());
System.out.println("quote values" + fc.isQuotes());
if (fc.isQuotes()) {
for (int i = 0; i < csvDataArray.length; i++) {
csvDataArray[i] = csvDataArray[i].replaceAll(""", "");
}
}
for (LineValidator lv : lvs) {
if (flag == 1) {
result = lv.validate(csvDataArray, fileName);
if (result == false) {
String write = line + "," + lv.getFailureDesc();
System.out.println("write" + write);
System.out.println("key" + new Text(fileName));
// output.collect(new Text(filename), new Text(write));
context.write(new Text(fileName), new Text(write));
flag = 0;
if (lv.stopValidation(csvDataArray) == true) {
break;
}
}
}
}
}
protected void cleanup(Context context) {
System.out.println("clean up in mapper");
}
}
public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
protected void reduce(Text key, Iterator<Text> values, Context context)
throws IOException, InterruptedException {
System.out.println("inside reduce method");
while (values.hasNext()) {
System.out.println(" Nullwritable value" + NullWritable.get());
System.out.println("key inside reduce method" + key.toString());
context.write(NullWritable.get(), values.next());
// out.write(NullWritable.get(), values.next(), "/user/hadoop/"
// + context.getJobID() + "/" + key.toString() + "/part-");
}
}
}
public static void main(String[] args) throws Exception {
System.out.println("hello");
Configuration configuration = getConf();
Job job = Job.getInstance(configuration);
job.setJarByClass(DataValidation.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
private static Configuration getConf() {
return new Configuration();
}
}
您没有正确地重写reduce方法。使用此:
public void reduce(Key Key,Iterable values,上下文上下文)抛出IOException,InterruptedException