mapreduce.TextInputFormat hadoop



我是一个hadoop初学者。我遇到了这个自定义RecordReader程序,它一次读取3行,并输出3行输入给映射器的次数。

我能够理解为什么使用RecordReader,但是我不能看到当输入格式类本质上扩展mapreduce时,每个InputSplit如何包含3行。TextInputFormat类。根据我的理解,TextInputFormat类为每行(每个n)发出1个InputSplit。

那么RecordReader如何从每个InputSplit读取3行呢?拜托谁解释一下这怎么可能。提前感谢!

您需要了解TextInputFormat的实现才能发现答案。

让我们深入研究代码。我将谈论新的mapreduce API,但"旧的"mapreduce API非常相似。

正如您所说的,从用户的角度来看,TextInputFormat根据一些新的行字符将一个split拆分为记录。让我们检查一下实现。

可以看到类几乎是空的。关键函数是createRecord,由InputFormat

定义
@Override
public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, 
        TaskAttemptContext context
) {
   return new LineRecordReader();
}

一般契约是使用InputFormat来获取RecordReader。如果您查看MapperMapContextImpl内部,您将看到映射器仅使用RecordReader来获取下一个键和值。他什么也不知道。

映射器:

public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  while (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }
  cleanup(context);

}

MapContextImpl:

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  return reader.nextKeyValue();
}

现在重新仔细阅读您提供的链接。你会看到:

  • NLinesInputFormat扩展TextInputFormat,只覆盖createRecordReader。基本上,不是使用LineReader,而是提供自己的RecordReader。您希望扩展TextInputFormat而不是层次结构中更高的另一个类,因为它已经处理了在此级别所做的一切,并且您可能需要(压缩,不可分割格式等)
  • NLinesRecordReader才是真正的工作。在initialize中,它做了从提供的InputSplit的正确偏移处寻找InputStream所需的事情。它还创建了一个LineReader,与TextInputFormat
  • 使用的相同。
  • nextKeyValue方法中,您将看到LineReader.readLine()被调用三次以获得三行(加上一些逻辑来正确处理拐角情况,如太大的记录,行结束,分割结束)

希望它能帮助你。关键是要理解API的整体设计,以及每个部分如何相互交互。

最新更新