我试图使用Time_Ant10s
(自定义ArrayWritable类)作为减速机的输出。
我指的是这个很好的问题:MapReduce输出ArrayWritable,但我得到NullPointerException在context.write()
在减速机的最后一行。
我想get()
在Time_Ant10s.toString()
可能返回null,但我不知道为什么会发生这种情况。你能帮我吗?
主要方法
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "something");
// general
job.setJarByClass(CommutingTime1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setNumReduceTasks(1);
job.setInputFormatClass (TextInputFormat.class);
// mapper output
job.setMapOutputKeyClass(Date_Uid.class);
job.setMapOutputValueClass(Time_Ant10.class);
// reducer output
job.setOutputFormatClass(CommaTextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Time_Ant10s.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Mapper
public static class Mapper1 extends Mapper<LongWritable, Text, Date_Uid, Time_Ant10> {
/* map as <date_uid, time_ant10> */
// omitted
}
}
减速器
public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
/* <date_uid, time_ant10> -> <date, time_ant10s> */
private IntWritable date = new IntWritable();
@Override
protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {
date.set(date_uid.getDate());
// count ants
int num = 0;
for(Time_Ant10 time_ant10 : time_ant10s){
num++;
}
if(num>=1){
Time_Ant10[] temp = new Time_Ant10[num];
int i=0;
for(Time_Ant10 time_ant10 : time_ant10s){
String time = time_ant10.getTimeStr();
int ant10 = time_ant10.getAnt10();
temp[i] = new Time_Ant10(time, ant10);
i++;
}
context.write(date, new Time_Ant10s(temp));
}
}
}
public static class CommaTextOutputFormat extends TextOutputFormat<IntWritable, Time_Ant10s> {
@Override
public RecordWriter<IntWritable, Time_Ant10s> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
String extension = ".txt";
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<IntWritable, Time_Ant10s>(fileOut, ",");
}
}
自定义可写的
// Time
public static class Time implements Writable {
private int h, m, s;
public Time() {}
public Time(int h, int m, int s) {
this.h = h;
this.m = m;
this.s = s;
}
public Time(String time) {
String[] hms = time.split(":", 0);
this.h = Integer.parseInt(hms[0]);
this.m = Integer.parseInt(hms[1]);
this.s = Integer.parseInt(hms[2]);
}
public void set(int h, int m, int s) {
this.h = h;
this.m = m;
this.s = s;
}
public void set(String time) {
String[] hms = time.split(":", 0);
this.h = Integer.parseInt(hms[0]);
this.m = Integer.parseInt(hms[1]);
this.s = Integer.parseInt(hms[2]);
}
public int[] getTime() {
int[] time = new int[3];
time[0] = this.h;
time[1] = this.m;
time[2] = this.s;
return time;
}
public String getTimeStr() {
return String.format("%1$02d:%2$02d:%3$02d", this.h, this.m, this.s);
}
public int getTimeInt() {
return this.h * 10000 + this.m * 100 + this.s;
}
@Override
public void readFields(DataInput in) throws IOException {
h = in.readInt();
m = in.readInt();
s = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(h);
out.writeInt(m);
out.writeInt(s);
}
}
// Time_Ant10
public static class Time_Ant10 implements Writable {
private Time time;
private int ant10;
public Time_Ant10() {
this.time = new Time();
}
public Time_Ant10(Time time, int ant10) {
this.time = time;
this.ant10 = ant10;
}
public Time_Ant10(String time, int ant10) {
this.time = new Time(time);
this.ant10 = ant10;
}
public void set(Time time, int ant10) {
this.time = time;
this.ant10 = ant10;
}
public void set(String time, int ant10) {
this.time = new Time(time);
this.ant10 = ant10;
}
public int[] getTime() {
return this.time.getTime();
}
public String getTimeStr() {
return this.time.getTimeStr();
}
public int getTimeInt() {
return this.time.getTimeInt();
}
public int getAnt10() {
return this.ant10;
}
@Override
public void readFields(DataInput in) throws IOException {
time.readFields(in);
ant10 = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
time.write(out);
out.writeInt(ant10);
}
}
// Time_Ant10s
public static class Time_Ant10s extends ArrayWritable {
public Time_Ant10s(){
super(Time_Ant10.class);
}
public Time_Ant10s(Time_Ant10[] time_ant10s){
super(Time_Ant10.class, time_ant10s);
}
@Override
public Time_Ant10[] get() {
return (Time_Ant10[]) super.get();
}
@Override
public String toString() {
int time, ant10;
Time_Ant10[] time_ant10s = get();
String output = "";
for(Time_Ant10 time_ant10: time_ant10s){
time = time_ant10.getTimeInt();
ant10 = time_ant10.getAnt10();
output += time + "," + ant10 + ",";
}
return output;
}
}
// Data_Uid
public static class Date_Uid implements WritableComparable<Date_Uid> {
// omitted
}
错误消息java.lang.Exception: java.lang.NullPointerException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.NullPointerException
at CommutingTime1$Time_Ant10s.toString(CommutingTime1.java:179)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:85)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:104)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at CommutingTime1$Reducer1.reduce(CommutingTime1.java:323)
at CommutingTime1$Reducer1.reduce(CommutingTime1.java:291)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我发现问题是reduce
中的Iterable
不能迭代两次。所以我参考这个页面,将减速器和Time_Ant10s
更改如下。现在一切都很顺利。
减速器
public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
private IntWritable date = new IntWritable();
@Override
protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {
String time = "";
int ant10;
date.set(date_uid.getDate());
ArrayList<Time_Ant10> temp_list = new ArrayList<Time_Ant10>();
for (Time_Ant10 time_ant10 : time_ant10s){
time = time_ant10.getTimeStr();
ant10 = time_ant10.getAnt10();
temp_list.add(new Time_Ant10(time, ant10));
}
if(temp_list.size() >= 1){
Time_Ant10[] temp_array = temp_list.toArray(new Time_Ant10[temp_list.size()]);
context.write(date, new Time_Ant10s(temp_array));
}
}
}
Time_Ant10s
public static class Time_Ant10s extends ArrayWritable {
public Time_Ant10s(){
super(Time_Ant10.class);
}
public Time_Ant10s(Time_Ant10[] time_ant10s){
super(Time_Ant10.class, time_ant10s);
}
@Override
public Time_Ant10[] get() {
return (Time_Ant10[]) super.get();
}
@Override
public String toString() {
int time, ant10;
Time_Ant10[] time_ant10s = get();
String output = "";
for(Time_Ant10 time_ant10: time_ant10s){
time = time_ant10.getTimeInt();
ant10 = time_ant10.getAnt10();
output += time + "," + ant10 + ",";
}
return output;
}
}