Python3中concurrent.futures.ThreadPoolExecutor的内存使用情况



我正在构建一个脚本,用于下载和解析奥巴马医改交易所医疗保险计划的福利信息。其中一部分需要从每个保险公司下载和解析计划福利JSON文件。为了做到这一点,我使用concurrent.futures.ThreadPoolExecutor和6个工作人员来下载每个文件(带有urllib),解析和循环JSON,并提取相关信息(存储在脚本中的嵌套字典中)。

(在win32上运行Python 3.5.1(v3.5.1:37a07ce59692015年12月6日01:38:48)[MSC v.1900 32位(英特尔)])

问题是,当我同时执行此操作时,脚本在下载\解析\循环通过JSON文件后似乎不会释放内存,过了一段时间后,它崩溃了,malloc引发了内存错误。

然而,当我用一个简单的for in循环串行执行时,程序不会崩溃,也不会占用大量内存。

def load_json_url(url, timeout):
req = urllib.request.Request(url, headers={ 'User-Agent' : 'Mozilla/5.0' })
resp = urllib.request.urlopen(req).read().decode('utf8')
return json.loads(resp) 

with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_json_url, url, 60): url for url in formulary_urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
# The below timeout isn't raising the TimeoutError.
data = future.result(timeout=0.01)
for item in data:
if item['rxnorm_id']==drugid: 
for row in item['plans']:
print (row['drug_tier'])
(plansid_dict[row['plan_id']])['drug_tier']=row['drug_tier']
(plansid_dict[row['plan_id']])['prior_authorization']=row['prior_authorization']
(plansid_dict[row['plan_id']])['step_therapy']=row['step_therapy']
(plansid_dict[row['plan_id']])['quantity_limit']=row['quantity_limit']
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))

else:
downloaded_plans=downloaded_plans+1

这不是你的错。as_complete()在完成之前不会发布其期货。已经记录了一个问题:https://bugs.python.org/issue27144

目前,我认为大多数方法是将as_complete()封装在另一个循环中,根据你想花多少RAM和你的结果会有多大,将其分块为合理数量的未来。它会在每个块上进行阻塞,直到所有工作都完成后再进入下一个块,所以速度要慢一些,或者可能会在中间停留很长一段时间,但我目前看不到其他方法,不过,当有更聪明的方法时,我们会随时公布这个答案。

作为一种替代解决方案,您可以在未来调用add_done_callback,而根本不使用as_completed。关键是不要保留对未来的引用。所以原问题中的future_to_url列表是个坏主意。

我所做的基本上是:

def do_stuff(future):
res = future.result()  # handle exceptions here if you need to
f = executor.submit(...)
f.add_done_callback(do_stuff)

如果使用标准模块"concurrent.foretures"并希望同时处理数百万个数据,那么一个工作队列将占用所有可用内存。

您可以使用有界池执行器。https://github.com/mowshon/bounded_pool_executor

pip install bounded-pool-executor

示例:

from bounded_pool_executor import BoundedProcessPoolExecutor
from time import sleep
from random import randint
def do_job(num):
sleep_sec = randint(1, 10)
print('value: %d, sleep: %d sec.' % (num, sleep_sec))
sleep(sleep_sec)
with BoundedProcessPoolExecutor(max_workers=5) as worker:
for num in range(10000):
print('#%d Worker initialization' % num)
worker.submit(do_job, num)

dodysw正确地指出,常见的解决方案是将输入分块并将任务块提交给执行器。他还正确地指出,在开始处理下一个区块之前,等待每个区块被完全处理会损失一些性能。

我建议一个更好的解决方案,它将向执行器提供连续的任务流,同时强制执行并行任务的最大数量的上限,以保持低内存占用率。

诀窍是使用concurrent.futures.wait来跟踪已经完成的期货和仍在等待完成的期货:

def load_json_url(url):
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
resp = urllib.request.urlopen(req).read().decode('utf8')
return json.loads(resp), None
except Exception as e:
return url, e
MAX_WORKERS = 6
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures_done = set()
futures_notdone = set()
for url in formulary_urls:
futures_notdone.add(executor.submit(load_json_url, url))
if len(futures_notdone) >= MAX_WORKERS:
done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
futures_done.update(done)
# Process results.
downloaded_plans = 0
for future in futures_done:
json, exc = future.result()
if exc:
print('%r generated an exception: %s' % (json, exc))
else:
downloaded_plans += 1
for item in data:
if item['rxnorm_id'] == drugid:
for row in item['plans']:
print(row['drug_tier'])
(plansid_dict[row['plan_id']])['drug_tier'] = row['drug_tier']
(plansid_dict[row['plan_id']])['prior_authorization'] = row['prior_authorization']
(plansid_dict[row['plan_id']])['step_therapy'] = row['step_therapy']
(plansid_dict[row['plan_id']])['quantity_limit'] = row['quantity_limit']

当然,您也可以定期处理循环内的结果,以便不时清空futures_done。例如,每当futures_done中的项目数量超过1000(或任何其他符合您需求的数量)时,您都可以这样做。如果您的数据集非常大,并且仅结果就会导致大量内存使用,那么这可能会派上用场。

相关内容

  • 没有找到相关文章