我是一个hadoop初学者。我遇到了这个自定义RecordReader程序,它一次读取3行,并输出3行输入给映射器的次数。
我能够理解为什么使用RecordReader,但是我不能看到当输入格式类本质上扩展mapreduce时,每个InputSplit如何包含3行。TextInputFormat类。根据我的理解,TextInputFormat类为每行(每个n)发出1个InputSplit。
那么RecordReader如何从每个InputSplit读取3行呢?拜托谁解释一下这怎么可能。提前感谢!
您需要了解TextInputFormat
的实现才能发现答案。
让我们深入研究代码。我将谈论新的mapreduce API,但"旧的"mapreduce API非常相似。
正如您所说的,从用户的角度来看,TextInputFormat
根据一些新的行字符将一个split拆分为记录。让我们检查一下实现。
可以看到类几乎是空的。关键函数是createRecord
,由InputFormat
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split,
TaskAttemptContext context
) {
return new LineRecordReader();
}
一般契约是使用InputFormat来获取RecordReader。如果您查看Mapper
和MapContextImpl
内部,您将看到映射器仅使用RecordReader来获取下一个键和值。他什么也不知道。
映射器:
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
MapContextImpl:
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
现在重新仔细阅读您提供的链接。你会看到:
-
NLinesInputFormat
扩展TextInputFormat
,只覆盖createRecordReader
。基本上,不是使用LineReader
,而是提供自己的RecordReader
。您希望扩展TextInputFormat
而不是层次结构中更高的另一个类,因为它已经处理了在此级别所做的一切,并且您可能需要(压缩,不可分割格式等) -
NLinesRecordReader
才是真正的工作。在initialize
中,它做了从提供的InputSplit
的正确偏移处寻找InputStream
所需的事情。它还创建了一个LineReader
,与TextInputFormat
使用的相同。 - 在
nextKeyValue
方法中,您将看到LineReader.readLine()
被调用三次以获得三行(加上一些逻辑来正确处理拐角情况,如太大的记录,行结束,分割结束)
希望它能帮助你。关键是要理解API的整体设计,以及每个部分如何相互交互。