Reduce未运行,但作业已成功完成



首先,我是Hadoop MapReduce的新手。我的化简器未运行,但显示作业已成功完成。以下是我的控制台输出:

    • 信息地图减少。作业:正在运行 作业:job_1418240815217_0015
    • 信息地图减少。作业:在优步模式下运行的作业job_1418240815217_0015:假
    • 信息地图减少。作业:映射 0% 减少 0%
    • 信息地图减少。作业:映射 100% 减少 0%
    • 信息地图减少。作业:作业job_1418240815217_0015成功完成
    • 信息地图减少。职位:计数器:30

主要类是:

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    @SuppressWarnings("deprecation")
    Job job = new Job(conf,"NPhase2");
    job.setJarByClass(NPhase2.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(NPhase2Value.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);        
    job.setMapperClass(MapClass.class);        
    job.setReducerClass(Reduce.class);
    int numberOfPartition = 0;  
    List<String> other_args = new ArrayList<String>();
    for(int i = 0; i < args.length; ++i) 
    {
        try {
            if ("-m".equals(args[i])) {
                //conf.setNumMapTasks(Integer.parseInt(args[++i]));
                ++i;
            } else if ("-r".equals(args[i])) {
                job.setNumReduceTasks(Integer.parseInt(args[++i]));
            } else if ("-k".equals(args[i])) {
                int knn = Integer.parseInt(args[++i]);
                conf.setInt("knn", knn);
                System.out.println(knn);
            } else {
                other_args.add(args[i]);
            }
            job.setNumReduceTasks(numberOfPartition * numberOfPartition);
            //conf.setNumReduceTasks(1);
        } catch (NumberFormatException except) {
            System.out.println("ERROR: Integer expected instead of " + args[i]);
        } catch (ArrayIndexOutOfBoundsException except) {
            System.out.println("ERROR: Required parameter missing from " + args[i-1]);
        }
    } 
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
        System.out.println("ERROR: Wrong number of parameters: " +
            other_args.size() + " instead of 2.");
    }
    FileInputFormat.setInputPaths(job, other_args.get(0));
    FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

我的映射器是:

公共静态类 MapClass 扩展映射器 {

    public void map(LongWritable key, Text value, Context context)  throws IOException, InterruptedException 
    {
        String line = value.toString();
        String[] parts = line.split("\s+");
        // key format <rid1>
        IntWritable mapKey = new IntWritable(Integer.valueOf(parts[0]));
        // value format <rid2, dist>
        NPhase2Value np2v = new NPhase2Value(Integer.valueOf(parts[1]), Float.valueOf(parts[2]));
        context.write(mapKey, np2v);
    }
}

我的减速器类是:

public static class Reduce extends Reducer<IntWritable, NPhase2Value, NullWritable, Text> 
    {
        int numberOfPartition;  
        int knn;
        class Record 
    {
        public int id2;
        public float dist;
        Record(int id2, float dist) 
        {
            this.id2 = id2;
            this.dist = dist;
        }
        public String toString() 
        {
            return Integer.toString(id2) + " " + Float.toString(dist);  
        } 
    }
    class RecordComparator implements Comparator<Record> 
    {
        public int compare(Record o1, Record o2) 
        {
            int ret = 0;
            float dist = o1.dist - o2.dist;
            if (Math.abs(dist) < 1E-6)
                ret = o1.id2 - o2.id2;
            else if (dist > 0)
                ret = 1;
            else 
                ret = -1;
            return -ret;
        }   
    }
    public void setup(Context context) 
    {
        Configuration conf = new Configuration();
        conf = context.getConfiguration();
        numberOfPartition = conf.getInt("numberOfPartition", 2);    
        knn = conf.getInt("knn", 3);
    }   
    public void reduce(IntWritable key, Iterator<NPhase2Value> values, Context context) throws IOException, InterruptedException 
    {
        //initialize the pq
        RecordComparator rc = new RecordComparator();
        PriorityQueue<Record> pq = new PriorityQueue<Record>(knn + 1, rc);
        // For each record we have a reduce task
        // value format <rid1, rid2, dist>
        while (values.hasNext()) 
        {
            NPhase2Value np2v = values.next();
            int id2 = np2v.getFirst().get();
            float dist = np2v.getSecond().get();
            Record record = new Record(id2, dist);
            pq.add(record);
            if (pq.size() > knn)
                pq.poll();
        }
        while(pq.size() > 0) 
        {
            context.write(NullWritable.get(), new Text(key.toString() + " " + pq.poll().toString()));
            //break; // only ouput the first record
        }
    } // reduce
}

这是我的助手类:

公共类 NPhase2Value 实现 WritableComparable {

private IntWritable first;
private FloatWritable second;
public NPhase2Value() {
    set(new IntWritable(), new FloatWritable());
}
public NPhase2Value(int first, float second) {
    set(new IntWritable(first), new FloatWritable(second));
}
public void set(IntWritable first, FloatWritable second) {
    this.first = first;
    this.second = second;   
}
public IntWritable getFirst() {
    return first;
}
public FloatWritable getSecond() {
    return second;
}
@Override
public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
}
@Override
public boolean equals(Object o) {
    if (o instanceof NPhase2Value) {
        NPhase2Value np2v = (NPhase2Value) o;
        return first.equals(np2v.first) && second.equals(np2v.second);
    }
    return false;
}
@Override
public String toString() {
    return first.toString() + " " + second.toString();
}
@Override
public int compareTo(NPhase2Value np2v) {
    return 1;
}
}

我使用的命令行命令是:

hadoop jar knn.jar NPhase2 -m 1 -r 3 -k 4 phase1out phase2out

我正在努力找出错误,但仍然无法提出解决方案。请在这方面帮助我,因为我的日程安排很紧。

因为您已将化简器任务的数量设置为 0。看到这个:

 int numberOfPartition = 0;  
 //.......
 job.setNumReduceTasks(numberOfPartition * numberOfPartition);

我没有看到您在代码中的任何地方重置了numberOfPartition。我精简了您应该在解析 -r 选项的位置设置它,或者完全删除对 setNumReduceTasks 方法的调用,因为您在解析 -r 选项时已经设置了它。

相关内容

  • 没有找到相关文章

最新更新