CustomArrayWritable类的toString()方法中的NullPointerException, Map



我试图使用Time_Ant10s(自定义ArrayWritable类)作为减速机的输出。

我指的是这个很好的问题:MapReduce输出ArrayWritable,但我得到NullPointerException在context.write()在减速机的最后一行。

我想get()Time_Ant10s.toString()可能返回null,但我不知道为什么会发生这种情况。你能帮我吗?

主要方法

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "something");
    // general
    job.setJarByClass(CommutingTime1.class);
    job.setMapperClass(Mapper1.class);
    job.setReducerClass(Reducer1.class);
    job.setNumReduceTasks(1);
    job.setInputFormatClass (TextInputFormat.class);
    // mapper output
    job.setMapOutputKeyClass(Date_Uid.class);
    job.setMapOutputValueClass(Time_Ant10.class);
    // reducer output
    job.setOutputFormatClass(CommaTextOutputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Time_Ant10s.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

Mapper

public static class Mapper1 extends Mapper<LongWritable, Text, Date_Uid, Time_Ant10> {
    /* map as <date_uid, time_ant10> */
    // omitted
    }
}

减速器

public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
    /* <date_uid, time_ant10> -> <date, time_ant10s> */
    private IntWritable date = new IntWritable();
    @Override
    protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {
        date.set(date_uid.getDate());
        // count ants
        int num = 0;
        for(Time_Ant10 time_ant10 : time_ant10s){
            num++;
        }
        if(num>=1){
            Time_Ant10[] temp = new Time_Ant10[num];
            int i=0;
            for(Time_Ant10 time_ant10 : time_ant10s){
                String time = time_ant10.getTimeStr();
                int ant10 = time_ant10.getAnt10();
                temp[i] = new Time_Ant10(time, ant10);
                i++;
            }
            context.write(date, new Time_Ant10s(temp));
        }
    }
}

public static class CommaTextOutputFormat extends TextOutputFormat<IntWritable, Time_Ant10s> {
    @Override
    public RecordWriter<IntWritable, Time_Ant10s> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        String extension = ".txt";
        Path file = getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        FSDataOutputStream fileOut = fs.create(file, false);
        return new LineRecordWriter<IntWritable, Time_Ant10s>(fileOut, ",");
    }
}

自定义可写的

// Time
public static class Time implements Writable { 
    private int h, m, s;
    public Time() {}
    public Time(int h, int m, int s) {
        this.h = h;
        this.m = m;
        this.s = s;
    }
    public Time(String time) {
        String[] hms = time.split(":", 0);
        this.h = Integer.parseInt(hms[0]);
        this.m = Integer.parseInt(hms[1]);
        this.s = Integer.parseInt(hms[2]);
    }
    public void set(int h, int m, int s) {
        this.h = h;
        this.m = m;
        this.s = s;
    }
    public void set(String time) {
        String[] hms = time.split(":", 0);
        this.h = Integer.parseInt(hms[0]);
        this.m = Integer.parseInt(hms[1]);
        this.s = Integer.parseInt(hms[2]);
    }
    public int[] getTime() {
        int[] time = new int[3];
        time[0] = this.h;
        time[1] = this.m;
        time[2] = this.s;
        return time;
    }
    public String getTimeStr() {
        return String.format("%1$02d:%2$02d:%3$02d", this.h, this.m, this.s);
    }
    public int getTimeInt() {
        return this.h * 10000 + this.m * 100 + this.s;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        h = in.readInt();
        m = in.readInt();
        s = in.readInt();
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(h);
        out.writeInt(m);
        out.writeInt(s);
    }
}
// Time_Ant10
public static class Time_Ant10 implements Writable { 
    private Time time;
    private int ant10;
    public Time_Ant10() {
        this.time = new Time();
    }
    public Time_Ant10(Time time, int ant10) {
        this.time = time;
        this.ant10 = ant10;
    }
    public Time_Ant10(String time, int ant10) {
        this.time = new Time(time);
        this.ant10 = ant10;
    }
    public void set(Time time, int ant10) {
        this.time = time;
        this.ant10 = ant10;
    }
    public void set(String time, int ant10) {
        this.time = new Time(time);
        this.ant10 = ant10;
    }
    public int[] getTime() {
        return this.time.getTime();
    }
    public String getTimeStr() {
        return this.time.getTimeStr();
    }
    public int getTimeInt() {
        return this.time.getTimeInt();
    }
    public int getAnt10() {
        return this.ant10;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        time.readFields(in);
        ant10 = in.readInt();
    }
    @Override
    public void write(DataOutput out) throws IOException {
        time.write(out);
        out.writeInt(ant10);
    }
}
// Time_Ant10s
public static class Time_Ant10s extends ArrayWritable {
    public Time_Ant10s(){
        super(Time_Ant10.class);
    }
    public Time_Ant10s(Time_Ant10[] time_ant10s){
        super(Time_Ant10.class, time_ant10s);
    }
    @Override
    public Time_Ant10[] get() {
        return (Time_Ant10[]) super.get();
    }
    @Override
    public String toString() {
        int time, ant10;
        Time_Ant10[] time_ant10s = get();
        String output = "";
        for(Time_Ant10 time_ant10: time_ant10s){
            time = time_ant10.getTimeInt();
            ant10 = time_ant10.getAnt10();
            output += time + "," + ant10 + ",";
        }
        return output;
    }
}   
// Data_Uid
public static class Date_Uid implements WritableComparable<Date_Uid> { 
    // omitted
}
错误消息

java.lang.Exception: java.lang.NullPointerException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.NullPointerException
    at CommutingTime1$Time_Ant10s.toString(CommutingTime1.java:179)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:85)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:104)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
    at CommutingTime1$Reducer1.reduce(CommutingTime1.java:323)
    at CommutingTime1$Reducer1.reduce(CommutingTime1.java:291)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我发现问题是reduce中的Iterable不能迭代两次。所以我参考这个页面,将减速器和Time_Ant10s更改如下。现在一切都很顺利。

@redflar3:非常感谢你给我的提示。我完全误解了我的代码哪里有错误。

减速器

public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
    private IntWritable date = new IntWritable();
    @Override
    protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {
        String time = "";
        int ant10;
        date.set(date_uid.getDate());
        ArrayList<Time_Ant10> temp_list = new ArrayList<Time_Ant10>();
        for (Time_Ant10 time_ant10 : time_ant10s){
            time = time_ant10.getTimeStr();
            ant10 = time_ant10.getAnt10();
            temp_list.add(new Time_Ant10(time, ant10));
        }
        if(temp_list.size() >= 1){
            Time_Ant10[] temp_array = temp_list.toArray(new Time_Ant10[temp_list.size()]);
            context.write(date, new Time_Ant10s(temp_array));
        }
    }
}

Time_Ant10s

public static class Time_Ant10s extends ArrayWritable {
    public Time_Ant10s(){
        super(Time_Ant10.class);
    }
    public Time_Ant10s(Time_Ant10[] time_ant10s){
        super(Time_Ant10.class, time_ant10s);
    }
    @Override
    public Time_Ant10[] get() {
        return (Time_Ant10[]) super.get();
    }
    @Override
    public String toString() {
        int time, ant10;
        Time_Ant10[] time_ant10s = get();
        String output = "";
        for(Time_Ant10 time_ant10: time_ant10s){
            time = time_ant10.getTimeInt();
            ant10 = time_ant10.getAnt10();
            output += time + "," + ant10 + ",";
        }
        return output;
    }
}

相关内容

  • 没有找到相关文章

最新更新