我正在尝试读取bucket文件,而不将其保存为文件:
import boto3
import botocore
from io import StringIO
import pandas as pd
s3 = boto3.resource('s3',config=botocore.config.Config(signature_version=botocore.UNSIGNED))
bucket = self.s3.Bucket('deutsche-boerse-xetra-pds')
objects = self.bucket.objects.filter(Prefix= date)
file = pd.read_csv(StringIO(self.bucket.Object(key=object.key).get().get('Body').read().decode('utf-8')))
这段代码运行得很好。但是,我希望使用并发(python-asyncio(来加快读取过程。我搜索了文档,但只能找到下载功能的内容,而不能找到获取功能的内容。
你有什么建议吗?
提前谢谢。
我找到了一个可以使用多处理的解决方案,因为我的最终目标是减少处理时间。
如下代码:
def generate_bucket():
s3_resoursce = boto3.resource('s3',config=botocore.config.Config(signature_version=botocore.UNSIGNED))
xetra_bucket = s3_resoursce.Bucket('deutsche-boerse-xetra-pds')
return s3_resoursce, xetra_bucket
def read_csv(object):
s3local, bucket_local = self.generate_bucket()
return pd.read_csv(StringIO(bucket_local.Object(key=object).get().get('Body').read().decode('utf-8')))
def import_raw_data(date: List[str]) -> pd.DataFrame:
import multiprocessing
s3local, bucket_local2 = self.generate_bucket()
objects = [i.key for i in list(bucket_local2.objects.filter(Prefix= date[0]))]
with multiprocessing.Pool(multiprocessing.cpu_count()) as p:
df = pd.concat(p.map(self.read_csv, objects))
return df
对我来说,它是有效的,但我相信有可能改进这个代码。我愿意接受建议。