是否可以结合使用Paramiko和Dask的read_csv()方法从远程服务器读取.csv?



今天我开始使用DaskParamiko包,部分是作为学习练习,部分原因是我开始了一个需要处理大型数据集(10 GB )的项目,这些数据集必须仅从远程 VM 访问(即无法在本地存储)。

以下代码段属于一个简短的帮助程序,该程序将创建 VM 上托管的大型 csv 文件的 dask 数据帧。我想稍后将其输出(对 dask 数据帧的引用)传递给第二个函数,该函数将对其执行一些概述分析。

import dask.dataframe as dd
import paramiko as pm
import pandas as pd
import sys
def remote_file_to_dask_dataframe(remote_path):
if isinstance(remote_path, (str)):
try:
client = pm.SSHClient()
client.load_system_host_keys()
client.connect('#myserver', username='my_username', password='my_password')
sftp_client = client.open_sftp()
remote_file = sftp_client.open(remote_path)
df = dd.read_csv(remote_file)
remote_file.close()
sftp_client.close()
return df 
except:
print("An error occurred.")
sftp_client.close()
remote_file.close()
else:
raise ValueError("Path to remote file as string required")

代码既不好也不完整,我会及时用ssh密钥替换用户名和密码,但这不是问题所在。在 jupyter 笔记本中,我之前使用服务器上文件的路径打开了 sftp 连接,并使用常规的 Pandas read_csv调用将其读入数据帧。但是,这里的等效行,使用 Dask,是问题的根源:df = dd.read_csv(remote_file)

我已经在线查看了文档(这里),但我无法判断我上面尝试的内容是否可行。似乎对于网络选项,Dask 想要一个 url。例如 S3 的参数传递选项似乎取决于该基础设施的后端。不幸的是,我无法理解破折号-ssh文档(这里)。

我已经浏览了打印语句,唯一无法执行的行是声明的行。引发的错误是:引发类型错误('网址类型未理解: %s' % 网址路径) 类型错误: URL 类型不被理解:

谁能指出我实现我想要做的事情的正确方向?我本以为达斯克的read_csv会像熊猫一样发挥作用,因为它是基于相同的。

我将不胜感激任何帮助,谢谢。

附言我知道 Pandas 的read_csvchunksize 选项,但如果可能的话,我想通过 Dask 来实现这一点。

在 Dask 的主版本中,文件系统操作现在正在使用fsspec,它与以前的实现(s3、gcs、hdfs)一起现在支持一些额外的文件系统,请参阅fsspec.registry.known_implementations到协议标识符的映射。

简而言之,如果您从 master 安装 fsspec 和 Dask,使用像 "sftp://user:pw@host:port/path" 这样的 url 现在应该适合您。

似乎您必须实现他们的"文件系统"接口。

我不确定您需要实现的最小方法集是什么才能允许read_csv.但是您绝对必须实施open.

class SftpFileSystem(object):
def open(self, path, mode='rb', **kwargs):
return sftp_client.open(path, mode)
dask.bytes.core._filesystems['sftp'] = SftpFileSystem
df = dd.read_csv('sftp://remote/path/file.csv')

最新更新