Hadoop FileSplit reading



假设客户端应用程序使用FileSplit对象从相应的文件中读取实际字节。

要做到这一点,必须通过以下代码从FileSplit创建InputStream对象:
    FileSplit split = ... // The FileSplit reference
    FileSystem fs   = ... // The HDFS reference
    FSDataInputStream fsin = fs.open(split.getPath());
    long start = split.getStart()-1; // Byte before the first
    if (start >= 0)
    {
        fsin.seek(start);
    }

流的-1调整存在于Hadoop MapReduce LineRecordReader类等一些场景中。然而,FSDataInputStream seek()方法的文档明确地指出,在寻找一个位置之后,下一次读取将从该位置读取,这意味着(?)上面的代码将偏离1字节(?)。

所以,问题是,"-1"调整是必要的所有InputSplit读取情况?顺便说一下,如果想要正确地读取FileSplit,寻找它的开始是不够的,因为每个分割也有一个结束,可能与实际HDFS文件的结束不相同。因此,相应的InputStream应该是"有界的",即有一个最大长度,如下所示:
    InputStream is = new BoundedInputStream(fsin, split.getLength());

在这种情况下,在上面创建了"本地"fsin蒸汽之后,使用org.apache.commons.io.input.BoundedInputStream类来实现"绑定"。

显然,只有在LineRecordReader类的用例行中才需要调整,这超过了分割的边界,以确保它读取完整的最后一行。

在MAPREDUCE-772的早期问题和评论中可以找到关于这个问题的更多细节的讨论。

寻求位置0将意味着下一次调用InputStream.read()将读取字节0。查找位置-1很可能会抛出异常。

当您谈到示例和源代码中的标准模式时,您具体指的是哪里?

拆分并不一定是有界的——以TextInputFormat和可以拆分的文件为例。处理分割的记录读取器将:

  • 查找开始索引,然后找到下一个换行符
  • 查找下一个换行字符(或EOF)并返回该'行'作为下一个记录

这个过程一直重复,直到找到的下一个换行符超过了分割的末尾,或者找到了EOF。所以你可以看到,在这种情况下,实际的分割边界可能会比输入分割

给出的边界右移

引用LineRecordReader中的代码块:

if (codec != null) {
  in = new LineReader(codec.createInputStream(fileIn), job);
  end = Long.MAX_VALUE;
} else {
  if (start != 0) {
    skipFirstLine = true;
    --start;
    fileIn.seek(start);
  }
  in = new LineReader(fileIn, job);
}
if (skipFirstLine) {  // skip first line and re-establish "start".
  start += in.readLine(new Text(), 0,
                       (int)Math.min((long)Integer.MAX_VALUE, end - start));
}

--start语句最有可能是为了避免从换行符开始的分割,并返回空行作为第一个记录。您可以看到,如果发生查找,将跳过第一行,以确保文件分割不会返回重叠的记录

相关内容

最新更新