Map/Reduce:如何在完成后输出Hashmap



我想实现DPC(通过快速搜索和查找密度峰值进行聚类)的算法。这是一项艰巨的工作,所以我决定从Rho的计算开始。

地图如下:

 public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] lineSplit = line.split(" ");
            if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
                IntWritable one = new IntWritable(
                        Integer.parseInt(lineSplit[0]));
                IntWritable two = new IntWritable(
                        Integer.parseInt(lineSplit[1]));
                context.write(one, two);
            }
        }

此处为减速器:

public void reduce(IntWritable key, IntWritable values, Context context)
                throws IOException, InterruptedException {
            int[] indexs = new int[2];
            indexs[0] = Integer.parseInt(key.toString());
            indexs[1] = Integer.parseInt(values.toString());
            for (int i = 0; i < indexs.length; i++) {
                densityCountMap.put(indexs[i],
                        densityCountMap.get(indexs[i]) + 1);
            }
        }

问题

densityCountMap是一个散列映射,只有在完成后才能使用。如何输出densityCountMap?以什么方式?

---------解决方案---------

多亏了mbaxi,你提到reduce定义不正确,不需要densityCountMap,这真的激励了我。

我应该更清楚地表明,目标是如果lineSplit[2]低于某个阈值,则增加lineSplit[0]和lineSplit[1]。实际上,没有必要覆盖清理。

映射器:

public static class TokenizerMapper extends
        Mapper<LongWritable, Text, IntWritable, IntWritable> {
    private final static IntWritable count = new IntWritable(1);
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] lineSplit = line.split(" ");
        if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
            IntWritable one = new IntWritable(
                    Integer.parseInt(lineSplit[0]));
            IntWritable two = new IntWritable(
                    Integer.parseInt(lineSplit[1]));
            context.write(one, count);// Both should be increased 
            context.write(two, count);// both as key
        }
    }
}

减速器:

public static class IntSumReducer extends
            Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(IntWritable key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);//densityCountMap is redundant if having known better the structure of Map/reduce
            context.write(key, result);//it equals to output densityCountMap
        }
    }

再次感谢,你带来的不仅仅是帮助,还有灵感

您可以覆盖cleanup(上下文上下文)方法,在reduce()方法中继续填充densityCountMap,并在cleanup方法中将内容刷新/写入磁盘。

cleanup()是在处理完所有行之后调用的。

---根据评论部分的要求进行编辑---

如果您使用的是Eclipse编辑器,右键单击您正在扩展的Reducer类,然后单击Source->Override/Inimplement Methods,否则您可以查找javadocs。

private static class RhoCalculationReducer extends Reducer<Text,Text,Text,Text> {
}

在那里你会看到以下方法的列表[请注意,输入参数/数据类型可能会根据你的类定义而变化]-

cleanup(Context)
reduce(Text, Iterable<Text>, Context)
run(Context)
setup(Context)

reduce()或map()函数实际上是被覆盖的实现,您可以在其中提供自己的处理逻辑。setup()和cleanup()可以被认为分别类似于map或reduce任务的构造函数或析构函数。setup()在reduce任务的map开始之前调用,cleanup()在任务结束时调用。

我还看到您的reduce定义是不正确的,而不是"IntWritable values",它应该是"Iterable values",对于reducer,可以确保单个密钥的值由单个reducer处理,这就是签名采用密钥和可迭代值列表的原因。您可能还想将单个键的记录聚合在一起,并且可能不需要额外的densityCountMap,因为reducer已经负责一次性提取给定键的所有值。

相关内容

  • 没有找到相关文章

最新更新