如何使用Airflow FTPHook从GCS向ftp服务器上传/下载文件



我目前正在尝试使用Airflow中的FTPHook,以便将文件上传到远程ftp或从远程ftp下载文件。但我不确定是否可以使用gs://路径作为源/目标路径的一部分。我目前不想在AF pod中使用本地文件夹,因为文件大小可能会变大,所以我宁愿直接使用gcs路径或gcs文件流。

conn = FTPHook(ftp_conn_id='ftp_default')
conn.store_file('in', 'gs://bucket_name/file_name.txt')

FTPHook代码的链接:此处

谢谢你的帮助!

我找到了一个简单的流媒体解决方案,可以使用pysftp从gcs上传/下载到ftp服务器,反之亦然,我想与您分享。首先,我发现了这个解决方案,它运行得很好,但该解决方案的唯一问题是它不支持将文件从gcs上传到FTP。所以我在寻找别的东西。所以我一直在寻找不同的方法,所以我找到了这个谷歌文档,它基本上允许你流式传输到blob文件,这正是我想要的。

params = BaseHook.get_connection(self.ftp_conn_id)
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
ftp = pysftp.Connection(host=params.host, username=params.login, password=params.password,
port=params.port,
cnopts=cnopts)
#This will download file from FTP server to GCS location
with ftp.open(self.ftp_folder + '/' + file_to_load, 'r+') as remote_file:
blob = bucket.blob(self.gcs_prefix + file_to_load)
blob.upload_from_file(remote_file)
#This will upload file from GCS to FTP server
with sftp.open(self.ftp_folder + '/' +file_name,'w+') as remote_file:
blob = bucket.blob(fileObject['name'])
blob.download_to_file(remote_file)

GCS没有实现FTP支持,所以这不起作用。看起来FTP挂钩只知道如何处理本地文件路径或缓冲区,而不知道如何处理GCS API。

您可能能够找到(或编写(一些从FTP读取并写入GCS的代码。

最新更新