在 hadoop 中使用可变长度/非分隔二进制文件进行拆分



我刚刚开始研究一个基于Hadoop的ingester,用于打开街道地图数据。 有几种格式 - 但我一直在针对基于协议缓冲区的格式(注意 - 它不是纯 pb)。

在我看来,将文件预先拆分为序列文件会更有效 - 而不是以自定义记录阅读器/输入格式处理可变长度编码 - 但想要进行健全性检查。

PBF 格式说明中更详细地描述了该格式但基本上它是[BlobHeader,Blob]块的集合。

有一个 Blob 标头

message BlobHeader {
   required string type = 1;
   optional bytes indexdata = 2;
   required int32 datasize = 3;
 }

然后是 Blob(其大小由标头中的 datasize 参数定义)

 message Blob {
   optional bytes raw = 1; // No compression
   optional int32 raw_size = 2; // Only set when compressed, to the uncompressed size
   optional bytes zlib_data = 3;
   // optional bytes lzma_data = 4; // PROPOSED.
   // optional bytes OBSOLETE_bzip2_data = 5; // Deprecated.
 }

一旦你进入 blob 显然会有更多的结构 - 但我会在映射器中处理它 - 我想做的最初是每个映射器有一个 blob(以后可能是每个映射器的一些 blob 的倍数)。

其他一些输入格式/记录阅读器使用"足够大"的拆分大小,然后向后/向前寻找分隔符 - 但由于没有分隔符可以让我知道 blob/标头的偏移量 - 也没有指向它们的索引 - 我看不到任何方法在不首先流式传输文件的情况下获得我的拆分点。

现在,我不需要实际从磁盘上读取整个文件 - 我可以从读取标头开始,使用该信息查找 blob,将其设置为第一个拆分点,然后重复。 但这是我能想到的预拆分为序列文件的唯一选择。

有没有更好的方法来解决这个问题 - 或者如果没有,对这两个建议的想法?

好吧,我在getSplits方法中解析了二进制文件 - 由于我跳过了超过99%的数据,因此速度非常快(planet-osm 20GB世界文件~20秒)。 这是getSplits方法,如果其他人绊倒了。

@Override
public List<InputSplit> getSplits(JobContext context){
    List<InputSplit> splits = new ArrayList<InputSplit>();
    FileSystem fs = null;
    Path file = OSMPBFInputFormat.getInputPaths(context)[0]; 
    FSDataInputStream in = null;
    try {
        fs = FileSystem.get(context.getConfiguration());
        in = fs.open(file);
        long pos = 0;
        while (in.available() > 0){
            int len = in.readInt(); 
            byte[] blobHeader = new byte[len]; 
            in.read(blobHeader);
            BlobHeader h = BlobHeader.parseFrom(blobHeader);
            FileSplit split = new FileSplit(file, pos,len + h.getDatasize(), new String[] {});
            splits.add(split);
            pos += 4;
            pos += len;
            pos += h.getDatasize();
            in.skip(h.getDatasize());
        }
    } catch (IOException e) {
        sLogger.error(e.getLocalizedMessage());
    } finally {
        if (in != null) {try {in.close();}catch(Exception e){}};
        if (fs != null) {try {fs.close();}catch(Exception e){}};
    }
    return splits;
}

到目前为止工作正常 - 尽管我还没有对输出进行地面验证。 这肯定比将 pbf 复制到 hdfs 更快,在单个映射器中转换为序列,然后摄取(复制时间占主导地位)。 这也比将外部程序复制到 hdfs 中的序列文件,然后针对 hdfs 运行映射器(编写后者脚本)快 ~20%。所以这里没有抱怨。

请注意,这会为每个块生成一个映射器 - 这是行星世界文件的 ~23k 映射器。 我实际上每次拆分捆绑多个块 - 只需循环 x 次,然后拆分才会添加到集合中。

对于 BlobHeader,我刚刚从上面的 OSM wiki 链接编译了 protobuf .proto 文件。 如果需要,您也可以从OSM二进制类中预先生成它 - maven片段是:

<dependency>
    <groupId>org.openstreetmap.osmosis</groupId>
    <artifactId>osmosis-osm-binary</artifactId>
    <version>0.43-RELEASE</version>
</dependency>

最新更新