假设客户端应用程序使用FileSplit
对象从相应的文件中读取实际字节。
FileSplit
创建InputStream
对象:
FileSplit split = ... // The FileSplit reference
FileSystem fs = ... // The HDFS reference
FSDataInputStream fsin = fs.open(split.getPath());
long start = split.getStart()-1; // Byte before the first
if (start >= 0)
{
fsin.seek(start);
}
流的-1调整存在于Hadoop
MapReduce
LineRecordReader
类等一些场景中。然而,FSDataInputStream
seek()
方法的文档明确地指出,在寻找一个位置之后,下一次读取将从该位置读取,这意味着(?)上面的代码将偏离1字节(?)。
FileSplit
,寻找它的开始是不够的,因为每个分割也有一个结束,可能与实际HDFS文件的结束不相同。因此,相应的InputStream
应该是"有界的",即有一个最大长度,如下所示:
InputStream is = new BoundedInputStream(fsin, split.getLength());
在这种情况下,在上面创建了"本地"fsin
蒸汽之后,使用org.apache.commons.io.input.BoundedInputStream
类来实现"绑定"。
显然,只有在LineRecordReader
类的用例行中才需要调整,这超过了分割的边界,以确保它读取完整的最后一行。
寻求位置0将意味着下一次调用InputStream.read()将读取字节0。查找位置-1很可能会抛出异常。
当您谈到示例和源代码中的标准模式时,您具体指的是哪里?
拆分并不一定是有界的——以TextInputFormat和可以拆分的文件为例。处理分割的记录读取器将:
- 查找开始索引,然后找到下一个换行符
- 查找下一个换行字符(或EOF)并返回该'行'作为下一个记录
这个过程一直重复,直到找到的下一个换行符超过了分割的末尾,或者找到了EOF。所以你可以看到,在这种情况下,实际的分割边界可能会比输入分割
给出的边界右移引用LineRecordReader中的代码块:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
--start
语句最有可能是为了避免从换行符开始的分割,并返回空行作为第一个记录。您可以看到,如果发生查找,将跳过第一行,以确保文件分割不会返回重叠的记录