reduce阶段的输入不是我在Hadoop(Java)中期望的



我正在使用MapReduce在Hadoop中开发一个非常简单的图形分析工具。我有一个如下所示的图表(每行代表和边缘 - 实际上,这是一个三角形图):

1 3
3 1
3 2
2 3

现在,我想使用MapReduce来计算此图中的三角形(显然是一个)。它仍在进行中,在第一阶段,我尝试获取每个顶点的所有邻居的列表。

我的主要类如下所示:

public class TriangleCount {
    public static void main( String[] args ) throws Exception {
        // remove the old output directory
        FileSystem fs = FileSystem.get(new Configuration());
        fs.delete(new Path("output/"), true);
        JobConf firstPhaseJob = new JobConf(FirstPhase.class);
        firstPhaseJob.setOutputKeyClass(IntWritable.class);
        firstPhaseJob.setOutputValueClass(IntWritable.class);
        firstPhaseJob.setMapperClass(FirstPhase.Map.class);
        firstPhaseJob.setCombinerClass(FirstPhase.Reduce.class);
        firstPhaseJob.setReducerClass(FirstPhase.Reduce.class);
        FileInputFormat.setInputPaths(firstPhaseJob, new Path("input/"));
        FileOutputFormat.setOutputPath(firstPhaseJob, new Path("output/"));
        JobClient.runJob(firstPhaseJob);
    }
}

我的映射器和Reducer实现看起来像这样,它们都非常简单:

public class FirstPhase {
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> {
        @Override
        public void map(LongWritable longWritable, Text graphLine, OutputCollector<IntWritable, IntWritable> outputCollector, Reporter reporter) throws IOException {
            StringTokenizer tokenizer = new StringTokenizer(graphLine.toString());
            int n1 = Integer.parseInt(tokenizer.nextToken());
            int n2 = Integer.parseInt(tokenizer.nextToken());
            if(n1 > n2) {
                System.out.println("emitting (" + new IntWritable(n1) + ", " + new IntWritable(n2) + ")");
                outputCollector.collect(new IntWritable(n1), new IntWritable(n2));
            }
        }
    }
    public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
        @Override
        public void reduce(IntWritable key, Iterator<IntWritable> iterator, OutputCollector<IntWritable, Text> outputCollector, Reporter reporter) throws IOException {
            List<IntWritable> nNodes = new ArrayList<>();
            while(iterator.hasNext()) {
                nNodes.add(iterator.next());
            }
            System.out.println("key: " + key + ", list: " + nNodes);
            // create pairs and emit these
            for(IntWritable n1 : nNodes) {
                for(IntWritable n2 : nNodes) {
                    outputCollector.collect(key, new Text(n1.toString() + " " + n2.toString()));
                }
            }
        }
    }
}

我已经在程序中添加了一些日志记录。在映射阶段,我打印要发出的对。在reduce阶段,我打印reduce的输入。我得到以下输出:

emitting (3, 1)
emitting (3, 2)
key: 3, list: [1, 1]

reduce函数的输入不是我所期望的。我希望它是[1,2]而不是[1,1]。我相信Hadoop会自动组合我从映射阶段的输出中发出的所有对,但我在这里错过了什么吗?任何帮助或解释将不胜感激。

对于开始使用Hadoop MapReduce的人来说,这是一个典型的问题。

问题出在您的减速器上。当循环遍历给定的Iterator<IntWritable>时,每个IntWritable实例都会被重用,因此它只在给定的时间保留一个实例。

这意味着当您调用iterator.next()时,您的第一个保存的IntWritable实例将使用新值进行设置。

您可以在此处阅读有关此问题的更多信息
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/

相关内容

  • 没有找到相关文章

最新更新