Hadoop正在完全跳过reduce阶段



我已经设置了一个Hadoop作业,如下所示:

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Legion");
    job.setJarByClass(Legion.class);
    job.setMapperClass(CallQualityMap.class);
    job.setReducerClass(CallQualityReduce.class);
    // Explicitly configure map and reduce outputs, since they're different classes
    job.setMapOutputKeyClass(CallSampleKey.class);
    job.setMapOutputValueClass(CallSample.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(CombineRepublicInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    CombineRepublicInputFormat.setMaxInputSplitSize(job, 128000000);
    CombineRepublicInputFormat.setInputDirRecursive(job, true);
    CombineRepublicInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.waitForCompletion(true);
}

这项工作完成了,但发生了一些奇怪的事情。每个输入行有一个输出行。每个输出行由CallSampleKey.toString()方法的输出组成,然后是一个选项卡,然后是类似CallSample@17ab34d的输出。

这意味着reduce阶段从未运行,并且CallSampleKeyCallSample被直接传递到TextOutputFormat。但我不明白为什么会出现这种情况。我已经非常清楚地指定了job.setReducerClass(CallQualityReduce.class);,所以我不知道为什么它会跳过减速器!

编辑:这是减速器的代码:

public static class CallQualityReduce extends Reducer<CallSampleKey, CallSample, NullWritable, Text> {
    public void reduce(CallSampleKey inKey, Iterator<CallSample> inValues, Context context) throws IOException, InterruptedException {
        Call call = new Call(inKey.getId().toString(), inKey.getUuid().toString());
        while (inValues.hasNext()) {
            call.addSample(inValues.next());
        }
        context.write(NullWritable.get(), new Text(call.getStats()));
    }
}

如果您试图更改

public void reduce(CallSampleKey inKey, Iterator<CallSample> inValues, Context context) throws IOException, InterruptedException {

使用Iterable而不是Iterator

public void reduce(CallSampleKey inKey, Iterable<CallSample> inValues, Context context) throws IOException, InterruptedException {

然后您必须使用inValues.iterator()来获得实际的迭代器。

如果方法签名不匹配,那么它只是通过默认的身份减少器实现。也许不幸的是,底层的默认实现并不能使检测这种类型的拼写错误变得容易,但接下来最好的事情是在所有要重写的方法中始终使用@Override,这样编译器就可以提供帮助。

相关内容

  • 没有找到相关文章

最新更新