hadoopjava:如何知道reducer输入已到达末尾



我的减速器看起来像这个

public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
        List<Text> allRecords = new ArrayList<Text>();
        public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
                allRecords.add(values.next());
                Text[] outputValues = new Text[7];
                for (int i=1; i>=7; i++) {
                    outputValues[i-1] = allRecords.get(allRecords.size() - i);
                }
        }
    }
  • 我只有一个减速器
  • 当reducer完成工作时,我需要收集前7条记录

问题

  • 如何知道收到减速器末端输入
    谢谢

我认为您误解了为映射的每个值编写的键的用途。键的目的是将元素分组为对reducer的特定调用。由于您希望同时考虑代码中的所有值,因此只需要使用一个密钥,如下所示:

public class MyMapper<K extends WritableComparable, V extends Writable> 
     extends MapReduceBase implements Mapper<IntWriteable, WhateverTheInputTypeWas,
                                             IntWriteable, Text> {
  public void map(IntWriteable key, WhateverTheInputTypeWas val,
                  OutputCollector<IntWriteable, Text> output, Reporter reporter)
    // do some processing
    output.collect(new IntWriteable(1), ...);
  }
}

基础结构自动收集特定密钥的所有值,并在对reduce的单个调用中显示这些值。这就是为什么reduce采用值的Iterator,而不仅仅是一个值。您所需要做的就是遍历整个迭代器,当hasNext()返回false时,即到达reduce函数对该特定键的输入的末尾。

public static class Reduce extends MapReduceBase 
                           implements Reducer<IntWritable, Text, 
                                              IntWritable, Text> {
  public void reduce(IntWritable key, Iterator<Text> values,
                     OutputCollector<IntWritable, Text> output,
                     Reporter reporter) throws IOException {
    int i=0
    Text[] outputValues = new Text[7];
    while (values.hasNext() && i < 7) {
      outputValues[i++] = values.next();
    }
    // now output the contents of outputValues to the OutputCollector
  }
}

如果您在reducer中进行的其他计算需要不同的键,那么也可以从映射器中输出这些键,并具有一个特殊的sentinel值(可能是-1,取决于键的含义),该值可以为映射的每个数据元素获取输出,然后仅当键等于sentinel时才运行此特殊逻辑。

您应该循环通过:

for (Text t : values) {
}

或者:

while (values.hasNext()) {
   Text t = values.next()
}

如果我理解你的问题,当reducer处理所有数据时,你需要一些通知
我知道的一点是输出格式中的close方法:
public void close(TaskAttemptContext上下文)
您可以在输出格式中重写此方法。它将在相关减速器完成工作后调用。

现在已经很晚了,但这可能对搜索相同问题的人有所帮助。

打开一个文件并将它写入到您想要查看的文件中。

例如,要查看哪个Reduce工作人员执行代码的哪一部分,您可以执行以下操作:

class myReducer extends Reducer{
     File f;
     void setup(){
          // open your file here
     }
     void reduce(){
          //write key/value or whatever whatever you want to see here
          //and your reduce method
     }
}

通过这种方式,你可以很容易地看到你的错误等…

相关内容

  • 没有找到相关文章

最新更新