如何使用 dask 延迟将 dask 数据帧写入 aws s3 中的单个 csv,以便速度更快?



目前我正在使用下面的代码,但它花费了太多时间。因为我正在将 dask 数据帧转换为缓冲区并使用分段上传将其上传到 s3 中

def multi_part_upload_with_s3(file_buffer_obj,BUCKET_NAME,key_path):
client = boto3.client('s3')
s3 = boto3.resource('s3') 
config = TransferConfig(multipart_threshold=1024 *25,max_concurrency=10,multipart_chunksize=1024 * 25,use_threads=True)
s3.meta.client.upload_fileobj(file_buffer_obj, BUCKET_NAME, key_path,Config=config)
ddf.compute().to_csv(target_buffer_old,sep=",")
target_buffer_old=io.BytesIO(target_buffer_old.getvalue().encode())
multi_part_upload_with_s3(target_buffer_old,"bucket","key/file.csv")

我建议您使用 dask 并行写入单独的 S3 文件(这是默认的工作方式),然后使用多部分上传将输出合并在一起。您可以使用s3fs方法merge执行此操作。请注意,您将希望在没有标头的情况下编写。

最新更新