PyArrow:如何使用新的文件系统接口从本地复制文件到远程?



有人能给我一个提示,我如何从本地文件系统复制文件到HDFS文件系统使用PyArrow的新文件系统接口(即上传,copyFromLocal)?

我前后阅读了文档,并尝试了一些事情(使用带有FS uri的copy_file()),但似乎都不起作用。旧的HDFS API的使用很简单,但它已被弃用,尽管新的API似乎不完整。当然,在文件描述符之间移动数据块是一种解决方案,但是为什么存在copy_file()呢?

新的(或旧的)文件系统api中没有用于在文件系统之间传输文件的函数。

当然在文件描述符之间移动数据块是一个解决方案

我不确定这是不是你想的,但这里有一个简单的实用程序(和演示),关于如何从python中做到这一点:

import filecmp
import pyarrow.fs as pafs
BATCH_SIZE = 1024 * 1024
def transfer_file(in_fs, in_path, out_fs, out_path):
with in_fs.open_input_stream(in_path) as in_file:
with out_fs.open_output_stream(out_path) as out_file:
while True:
buf = in_file.read(BATCH_SIZE)
if buf:
out_file.write(buf)
else:
break
local_fs = pafs.LocalFileSystem()
s3fs = pafs.S3FileSystem()
in_path = '/tmp/in.data'
out_path = 'mybucket/test.data'
back_out_path = '/tmp/in_copy.data'
transfer_file(local_fs, in_path, s3fs, out_path)
transfer_file(s3fs, out_path, local_fs, back_out_path)
files_match = filecmp.cmp(in_path, back_out_path)
print(f'Files Match: {files_match}')

我期望transfer_file获得良好的性能。在某些情况下(例如从S3读取),使用read_at进行并行读取可能会受益,这将需要更多的复杂性,但也应该是可行的。

但是为什么copy_file()存在呢?

copy_file将文件从文件系统中的一个名称复制到同一文件系统中的另一个名称。它不能用于在不同文件系统之间复制文件。

添加到@Pace的答案(太长了):我正在复制gzip文件(*.gz), pyarrow(默认情况下)解压每个read()调用,然后再次压缩每个write()调用。我用Pace版本的代码中显示的print语句验证了这一点。

因此,要使显著地更快地传输/复制,请关闭压缩:
def transfer_file(in_fs, in_path, out_fs, out_path):
with in_fs.open_input_stream(in_path, compression=None) as in_file:
with out_fs.open_output_stream(out_path, compression=None) as out_file:
while True:
buf = in_file.read(BATCH_SIZE)
if buf:
print(f'buf size: len(buf)')
out_file.write(buf)
else:
break

最新更新