我必须从本地/hdfs/kafka读取gz文件,并解压缩并解析它。谁对此有任何经验?
或者另一种类型喜欢垃圾桶.tar.gz
您可以使用
sc.binaryFiles
读取二进制文件并对内容字节执行任何您喜欢的操作。
至于 tar.gz,请参阅 从 Spark 中的压缩中读取整个文本文件
这是我所做的:1. 读取二进制数据 = sc.binaryFiles(路径)2. 提取内容
data = (data
.map(lambda x: (x[0], ungzip(x[1])))
)
def ungzip(df):
compressed_file = io.BytesIO(df)
decompressed_file = gzip.GzipFile(fileobj=compressed_file)
return decompressed_file.read()
- 解析消息
定义_VarintDecoder(掩码):
local_ord = ord
def DecodeVarint(buffer, pos):
result = 0
shift = 0
while 1:
if pos > len(buffer) - 1:
raise NotEnoughDataExcption("Not enough data to decode varint")
b = local_ord(buffer[pos])
result |= ((b & 0x7f) << shift)
pos += 1
if not (b & 0x80):
result &= mask
return (result, pos)
shift += 7
if shift >= 64:
raise ValueError('Too many bytes when decoding varint.')
return DecodeVarint
.
def parse_binary(data):
decoder = _VarintDecoder((1 << 64) - 1)
next_pos, pos = 0, 0
messages = []
try:
while 1:
next_pos, pos = decoder(data[1], pos)
messages.append((data[0], data[1][pos:pos + next_pos]))
pos += next_pos
except:
return messages
.
data = (data
.flatMap(lambda x: parse_binary(x))
)
在此之后,您将每行一个 protobuf 消息,您可以并行应用 protobuf_parsing 函数