我想处理位于 s3 存储桶中的一个巨大的 XML 文件文件夹。我想在Python中以分布式方式做到这一点。
所以我首先定义一个带有boto3
库的分页器,设置(例如(100
元素的页面大小(参考文档(
profile = boto3.Session()
client = profile.client('s3')
paginator = client.get_paginator('list_objects')
page_iterator = paginator.paginate(
Bucket='my-bucket',Prefix='my-prefix',
PaginationConfig={'PageSize': 100}
)
之后concurrent.futures
包我创建了一个10
线程池,每个线程调用my_process_method
:
def my_process_method(pages):
for page in pages['Contents']:
# ...process...
with concurrent.futures.ThreadPoolExecutor(10) as executor:
executor.map(my_process_method, page_iterator)
我想知道这个例子中是否有一些缺点,例如并发boto3
API 调用可能会导致某种问题
这段代码对你有用吗?如果我做类似的事情,我总是只得到第一页的前 1000 个文件
global keys
keys = []
def my_process_method(pages):
for item in page['Contents']:
keys.append(item.get('Key'))