如何通过sftp进行异步文件传输



我试图解决的问题如下:我有一台台式电脑,里面有大量的数据(大约5 TB(,我想分析这些数据。数据由500k个文件组成,每个文件都可以单独分析。对于分析,我在大学里有一系列可用的服务器,然而,服务器没有空间容纳所有这些数据,也没有空间存储分析的输出。

因此,我的想法是将数据分段复制到服务器,运行分析,将结果传输回桌面,删除服务器上的输入和输出数据,然后重复。

对于文件传输,我昨天安装了paramiko,它似乎工作得很好:

remote_get = 'test'
local_deliver = './test'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.load_host_keys(os.path.expanduser(os.path.join("~", ".ssh", "known_hosts")))
ssh.connect(server, username=username, password=password)
sftp = ssh.open_sftp()
for root, dirs, files in os.walk(local_path):
for fname in files:
full_fname = os.path.join(root, fname)
full_remote = os.path.join(remote_path, fname)
sftp.put(full_fname, full_remote)
sftp.close()
ssh.close()

然而,我唯一的问题是,我需要传输的数据量可能需要几天的时间才能来回传输,因此,如果可能的话,我希望异步启动数据传输,这样我就可以在传输下一个要分析的数据集的同时对当前数据集进行分析。

但我不知道如何做这样的事情,有人能给我指明正确的方向吗?

此解决方案使用multiprocessing.Pool创建单独进程的任务池。每次调用apply_async时,都会传递一个函数指针和一个参数列表。在这种情况下,要执行的函数是copy_file,arg是文件名:

import os
import paramiko
from multiprocessing import Pool
remote_get = 'test'
local_deliver = './test'
pool = Pool(processes=4)  # Experiment with this number based on your # CPUs
def copy_file(filename):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.load_host_keys(os.path.expanduser(os.path.join("~", ".ssh", "known_hosts")))
ssh.connect(server, username=username, password=password)
sftp = ssh.open_sftp()
full_fname = os.path.join(root, fname)
full_remote = os.path.join(remote_path, fname)
sftp.put(full_fname, full_remote)
sftp.close()
ssh.close()
for root, dirs, files in os.walk(local_deliver):
for fname in files:
pool.apply_async(copy_file, [fname])

你的原始版本中有几个变量没有被考虑在内,所以我用了我最好的猜测。ssh和sftp客户端创建需要在copy_file中移动,因为除非它是可序列化的,否则您无法保存它并在进程之间共享它。

multiprocessing.Pool上的processes参数可以根据CPU数量进行调整,但请记住,您将在这里与多个瓶颈作斗争:1。CPU,2。NIC带宽限制,3。磁盘I/O限制。

以下是更多的多处理文档:https://docs.python.org/3/library/multiprocessing.html#using-员工

编辑:我还记得帕拉米科的SFTP与执行SFTP命令的速度明显较慢。为了获得更好的性能,编写批处理文件并使用subprocess.call执行它们可能是值得的。

最新更新