运行MapReduce代码时出现FileAlreadyExistsException



这个程序应该完成MapReduce作业。必须将第一作业的输出作为第二作业的输入。

当我运行它时,我会得到两个错误:

  1. 线程"main"org.apache.hadop.mapred.FileAlreadyExistsException中出现异常
  2. 映射部件正在100%运行,但减速器未运行

这是我的代码:

import java.io.IOException;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;
public class MaxPubYear {
    public static class FrequencyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Text word = new Text();
            String delim = ";";
            Integer year = 0;
            String tokens[] = value.toString().split(delim);
            if (tokens.length >= 4) {
                year = TryParseInt(tokens[3].replace(""", "").trim());
                if (year > 0) {
                    word = new Text(year.toString());
                    context.write(word, new IntWritable(1));
                }
            }
        }
    }
    public static class FrequencyReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
    public static class MaxPubYearMapper extends
            Mapper<LongWritable, Text, IntWritable, Text> {
        public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
            String delim = "t";
            Text valtosend = new Text();
            String tokens[] = value.toString().split(delim);
            if (tokens.length == 2) {
                valtosend.set(tokens[0] + ";" + tokens[1]);
                context.write(new IntWritable(1), valtosend);
            }
        }
    }
    public static class MaxPubYearReducer extends
            Reducer<IntWritable, Text, Text, IntWritable> {
        public void reduce(IntWritable key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            int maxiValue = Integer.MIN_VALUE;
            String maxiYear = "";
            for (Text value : values) {
                String token[] = value.toString().split(";");
                if (token.length == 2
                        && TryParseInt(token[1]).intValue() > maxiValue) {
                    maxiValue = TryParseInt(token[1]);
                    maxiYear = token[0];
                }
            }
            context.write(new Text(maxiYear), new IntWritable(maxiValue));
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Frequency");
        job.setJarByClass(MaxPubYear.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(FrequencyMapper.class);
        job.setCombinerClass(FrequencyReducer.class);
        job.setReducerClass(FrequencyReducer.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1] + "_temp"));
        int exitCode = job.waitForCompletion(true) ? 0 : 1;
        if (exitCode == 0) {
            Job SecondJob = new Job(conf, "Maximum Publication year");
            SecondJob.setJarByClass(MaxPubYear.class);
            SecondJob.setOutputKeyClass(Text.class);
            SecondJob.setOutputValueClass(IntWritable.class);
            SecondJob.setMapOutputKeyClass(IntWritable.class);
            SecondJob.setMapOutputValueClass(Text.class);
            SecondJob.setMapperClass(MaxPubYearMapper.class);
            SecondJob.setReducerClass(MaxPubYearReducer.class);
            FileInputFormat.addInputPath(SecondJob, new Path(args[1] + "_temp"));
            FileOutputFormat.setOutputPath(SecondJob, new Path(args[1]));
            System.exit(SecondJob.waitForCompletion(true) ? 0 : 1);
        }
    }
    public static Integer TryParseInt(String trim) {
        // TODO Auto-generated method stub
        return(0);
    }
}

线程"main"中出现异常org.apache.hadop.mapred.FileAlreadyExisticsException

Map reduce作业不会覆盖现有目录中的内容。MR作业的输出路径必须是不存在的目录路径。MR作业将在指定路径创建一个目录,其中包含文件。

在您的代码中:

FileOutputFormat.setOutputPath(作业,新路径(args[1]+"_temp"));

运行MR作业时,请确保此路径不存在。

相关内容

  • 没有找到相关文章

最新更新