Mapreduce链接作业失败,出现异常



我是hadoop的新手,在运行mapreduce作业时遇到了这个错误。我正在尝试计算一个人的平均值,并尝试接受第一份工作的输入,然后转到第二份工作来计算成绩。我理解这个问题,但我不知道我哪里做错了。

以下是例外:

   15/07/02 23:53:36 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
    15/07/02 23:53:36 INFO input.FileInputFormat: Total input paths to process : 1
    15/07/02 23:53:38 INFO mapred.JobClient: Running job: job_201507022153_0026
    15/07/02 23:53:39 INFO mapred.JobClient:  map 0% reduce 0%
    15/07/02 23:53:44 INFO mapred.JobClient: Task Id : attempt_201507022153_0026_m_000000_0, Status : FAILED
    java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.DoubleWritable
        at com.hadoop.mrchain.Driver$Mapper2.map(Driver.java:1)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:266)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1278)
        at org.apache.hadoop.mapred.Child.main(Child.java:260)

我的代码:

package com.hadoop.mrchain;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver {
    /*
     * Mapper1
     */
    public static class Mapper1 extends
            Mapper<Object, Text, Text, DoubleWritable> {
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            String studentName = itr.nextToken();
            Double marks = Double.parseDouble(itr.nextToken());
            context.write(new Text(studentName), new DoubleWritable(marks));
        }
    }
    /*
     * Mapper1
     */
    public static class Mapper2 extends
            Mapper<Object, DoubleWritable, Text, DoubleWritable> {
        public void map(Object key, DoubleWritable value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            context.write(new Text(itr.nextToken()), new DoubleWritable(Double
                    .parseDouble(itr.nextToken().toString())));
        }
    }
    /*
     * Reducer1
     */
    public static class Reducer1 extends
            Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        public void reduce(Text key, Iterable<DoubleWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            int count = 0;
            for (DoubleWritable val : values) {
                sum += val.get();
                count++;
            }
            double avg = sum / count;
            context.write(key, new DoubleWritable(avg));
        }
    }
    /*
     * Reducer2
     */
    public static class Reducer2 extends
            Reducer<Text, DoubleWritable, Text, Text> {
        public void reduce(Text key, Iterable<DoubleWritable> values,
                Context context) throws IOException, InterruptedException {
            for (DoubleWritable val : values) {
                // double marks = Double.parseDouble(val.toString());
                int marks = ((Double) val.get()).intValue();
                if (marks >= 70) {
                    context.write(key, new Text("GradeA"));
                } else if (marks >= 60 && marks < 70) {
                    context.write(key, new Text("GradeB"));
                } else if (marks < 60 && marks >= 40) {
                    context.write(key, new Text("GradeC"));
                } else {
                    context.write(key, new Text("FAIL"));
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        cleanFileSystem(conf, args);
        Job job1 = new Job(conf, "BATCH51-MRCHAIN-JOB1");
        job1.setJarByClass(Driver.class);
        job1.setMapperClass(Mapper1.class);
        job1.setCombinerClass(Reducer1.class);
        job1.setReducerClass(Reducer1.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));
        job1.waitForCompletion(true);
        // Job2
        Job job2 = new Job(conf, "BATCH51-MRCHAIN-JOB2");
        job2.setJarByClass(Driver.class);
        job2.setMapperClass(Mapper2.class);
        job2.setCombinerClass(Reducer2.class);
        job2.setReducerClass(Reducer2.class);
        // job2.setMapOutputKeyClass(Text.class);
        // job2.setMapOutputValueClass(DoubleWritable.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job2, new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));
        System.exit(job2.waitForCompletion(true) ? 0 : 1);
    }
    private static void cleanFileSystem(Configuration conf, String[] args)
            throws Exception {
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(new Path(args[1]))) {
            fs.delete(new Path(args[1]), true);
        }
        if (fs.exists(new Path(args[2]))) {
            fs.delete(new Path(args[2]), true);
        }
        // if (fs.exists(new Path(args[3]))) {
        // fs.delete(new Path(args[3]), true);
        // }
    }
}

样本输入:

hello 90
suresh 80
krishna 16
ramesh 55
santosh 82
anji 66
gopal 88
hello99
suresh 80
krishna 16
gopal 91
hello 91
suresh 80
krishna 86
ramesh 55
santosh 82
anji 66
gopal 95

它不能将几个字符串强制转换为double,例如hello不能强制转换为double。您需要更改映射程序中的逻辑来解决此问题。

有问题的代码中有两个问题需要解决:

  1. 我们需要确保第二个映射器能够正确读取第一个映射reduce作业生成的输出。由于使用的输入格式是默认的TextInputFormat,它读取并存储LongWritable, Text中的键值。在这里,代码试图将类型Text的值拟合为类型DoubleWritable。因此出现了例外。为了解决这个问题,我们需要确保Text进入Text

  2. 当组合器输出到reducer时,给定的reducer类不能按原样用于组合器。解释它;在给定的场景中,组合器发出Text, Text,但这不是类型减少器期望其键值为.

以下是使代码工作所需的更改:

Mapper<LongWritable, Text, Text, DoubleWritable> { //Changed in mapper2 defn
//Changes in Driver main method
job1.setInputFormatClass(TextInputFormat.class); //added
job1.setOutputFormatClass(TextOutputFormat.class); //added
//job2.setCombinerClass(Reducer2.class);  //commented
job2.setMapOutputKeyClass(Text.class);   //un-commented
job2.setMapOutputValueClass(DoubleWritable.class); //un-commented
job2.setInputFormatClass(TextInputFormat.class); //added

希望这能有所帮助。

最新更新