我刚刚开始研究一个基于Hadoop的ingester,用于打开街道地图数据。 有几种格式 - 但我一直在针对基于协议缓冲区的格式(注意 - 它不是纯 pb)。
在我看来,将文件预先拆分为序列文件会更有效 - 而不是以自定义记录阅读器/输入格式处理可变长度编码 - 但想要进行健全性检查。
PBF 格式说明中更详细地描述了该格式但基本上它是[BlobHeader,Blob]块的集合。
有一个 Blob 标头
message BlobHeader {
required string type = 1;
optional bytes indexdata = 2;
required int32 datasize = 3;
}
然后是 Blob(其大小由标头中的 datasize 参数定义)
message Blob {
optional bytes raw = 1; // No compression
optional int32 raw_size = 2; // Only set when compressed, to the uncompressed size
optional bytes zlib_data = 3;
// optional bytes lzma_data = 4; // PROPOSED.
// optional bytes OBSOLETE_bzip2_data = 5; // Deprecated.
}
一旦你进入 blob 显然会有更多的结构 - 但我会在映射器中处理它 - 我想做的最初是每个映射器有一个 blob(以后可能是每个映射器的一些 blob 的倍数)。
其他一些输入格式/记录阅读器使用"足够大"的拆分大小,然后向后/向前寻找分隔符 - 但由于没有分隔符可以让我知道 blob/标头的偏移量 - 也没有指向它们的索引 - 我看不到任何方法在不首先流式传输文件的情况下获得我的拆分点。
现在,我不需要实际从磁盘上读取整个文件 - 我可以从读取标头开始,使用该信息查找 blob,将其设置为第一个拆分点,然后重复。 但这是我能想到的预拆分为序列文件的唯一选择。
有没有更好的方法来解决这个问题 - 或者如果没有,对这两个建议的想法?
好吧,我在getSplits方法中解析了二进制文件 - 由于我跳过了超过99%的数据,因此速度非常快(planet-osm 20GB世界文件~20秒)。 这是getSplits方法,如果其他人绊倒了。
@Override
public List<InputSplit> getSplits(JobContext context){
List<InputSplit> splits = new ArrayList<InputSplit>();
FileSystem fs = null;
Path file = OSMPBFInputFormat.getInputPaths(context)[0];
FSDataInputStream in = null;
try {
fs = FileSystem.get(context.getConfiguration());
in = fs.open(file);
long pos = 0;
while (in.available() > 0){
int len = in.readInt();
byte[] blobHeader = new byte[len];
in.read(blobHeader);
BlobHeader h = BlobHeader.parseFrom(blobHeader);
FileSplit split = new FileSplit(file, pos,len + h.getDatasize(), new String[] {});
splits.add(split);
pos += 4;
pos += len;
pos += h.getDatasize();
in.skip(h.getDatasize());
}
} catch (IOException e) {
sLogger.error(e.getLocalizedMessage());
} finally {
if (in != null) {try {in.close();}catch(Exception e){}};
if (fs != null) {try {fs.close();}catch(Exception e){}};
}
return splits;
}
到目前为止工作正常 - 尽管我还没有对输出进行地面验证。 这肯定比将 pbf 复制到 hdfs 更快,在单个映射器中转换为序列,然后摄取(复制时间占主导地位)。 这也比将外部程序复制到 hdfs 中的序列文件,然后针对 hdfs 运行映射器(编写后者脚本)快 ~20%。所以这里没有抱怨。
请注意,这会为每个块生成一个映射器 - 这是行星世界文件的 ~23k 映射器。 我实际上每次拆分捆绑多个块 - 只需循环 x 次,然后拆分才会添加到集合中。
对于 BlobHeader,我刚刚从上面的 OSM wiki 链接编译了 protobuf .proto 文件。 如果需要,您也可以从OSM二进制类中预先生成它 - maven片段是:
<dependency>
<groupId>org.openstreetmap.osmosis</groupId>
<artifactId>osmosis-osm-binary</artifactId>
<version>0.43-RELEASE</version>
</dependency>