在 hadoop 中计算"n"最大值



我有一个场景。前一个作业的输出1

在下一个工作中,我需要找到具有最大值的i键。eg i= 3,3个键值最大(i为自定义参数)

如何处理这个

我们是否应该在job2映射器中计算max,因为会有唯一的键,因为输出来自之前的reducer或在第二作业减速器中发现max。但是如何找到i键呢?

我试过了而不是发出值作为值在减速器。我发出value作为键,这样我就可以按升序获得值。我写了下一个MR工作。其中mapper只是发出键/值。

Reducer找到key的最大值但是当我们试图获取id时,我又被困住了,因为id是唯一的,值不是唯一的

如何解决这个问题

有谁能提出一个解决方案吗?

可以用PriorityQueue找到顶级i键。简单的代码来说明这个想法:

public static class TopNMapper extends Mapper<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
    private static class MyPair implements Comparable<MyPair> {
        public final int key;
        public final double value;
        MyPair(int key, double value) {
            this.key = key;
            this.value = value;
        }
        @Override
        public int compareTo(MyPair o) {
            return -Double.compare(value, o.value); // i'm not sure about '-'
        }
    }
    private final PriorityQueue<MyPair> topN = new PriorityQueue<MyPair>();
    @Override
    protected void map(IntWritable key, DoubleWritable value, Context context) throws IOException, InterruptedException {
        if (Double.isNaN(value.get())) {
            return; // not a number
        }
        topN.add(new MyPair(key.get(), value.get()));
        if (topN.size() <= 50) { // simple optimization
            return;
        }
        while (topN.size() > 3) { // retain only top 3 elements in queue
            topN.poll();
        }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        while (topN.size() > 3) {
            topN.poll(); // retain only top 3 elements in queue
        }
        for (MyPair myPair : topN) { // write top 3 elements
            context.write(new IntWritable(myPair.key), new DoubleWritable(myPair.value));
        }
    }
}

如果您运行这个映射器(一个用于所有输入),您应该得到3个具有最大值的键。

相关内容

  • 没有找到相关文章