如何在Spark / Spark Streaming中读取二进制-protobuf gz文件



我必须从本地/hdfs/kafka读取gz文件,并解压缩并解析它。谁对此有任何经验?

或者另一种类型喜欢垃圾桶.tar.gz

您可以使用

sc.binaryFiles读取二进制文件并对内容字节执行任何您喜欢的操作。

至于 tar.gz,请参阅 从 Spark 中的压缩中读取整个文本文件

这是我所做的:1. 读取二进制数据 = sc.binaryFiles(路径)2. 提取内容

data = (data
        .map(lambda x: (x[0], ungzip(x[1])))
        )

def ungzip(df):
    compressed_file = io.BytesIO(df)
    decompressed_file = gzip.GzipFile(fileobj=compressed_file)
    return decompressed_file.read()
  1. 解析消息

定义_VarintDecoder(掩码):

    local_ord = ord
    def DecodeVarint(buffer, pos):
        result = 0
        shift = 0
        while 1:
            if pos > len(buffer) - 1:
                raise NotEnoughDataExcption("Not enough data to decode varint")
            b = local_ord(buffer[pos])
            result |= ((b & 0x7f) << shift)
            pos += 1
            if not (b & 0x80):
                result &= mask
                return (result, pos)
            shift += 7
            if shift >= 64:
                raise ValueError('Too many bytes when decoding varint.')
    return DecodeVarint

.

def parse_binary(data):
    decoder = _VarintDecoder((1 << 64) - 1)
    next_pos, pos = 0, 0
    messages = []
    try:
        while 1:
            next_pos, pos = decoder(data[1], pos)
            messages.append((data[0], data[1][pos:pos + next_pos]))
            pos += next_pos
    except:
        return messages

.

data = (data
        .flatMap(lambda x: parse_binary(x))
        )

在此之后,您将每行一个 protobuf 消息,您可以并行应用 protobuf_parsing 函数

最新更新