使用asyncio处理数百万条记录会导致内存错误



我得到下面的错误

致命Python错误:在正常化异常时无法从MemoryErrors中恢复。当前线程0x0000ffff88de5010(首先是最近的调用):文件"test.py", wrap_get_fuzzy_match中的第173行文件"/usr/lib64/python3.7/asyncio/events.py",第88行_run .py&quot文件"/usr/lib64/python3.7/asyncio/base_events.py",第1786行在_run_once文件"/usr/lib64/python3.7/asyncio/base_events.py&quot文件"/usr/lib64/python3.7/asyncio/base_events.py", run_until_complete中的第574行文件"test.py",第224行流产的

async def get_valuation(url, params, api_header, session, semaphore):
async with semaphore:
async with session.get(url, headers=api_header) as response:
status_code = response.status
try:
if status_code != 200:
mmr = {params: 'not found' + ',' + ' ' + str(status_code)}
else:
asynch_response = await response.json()
mmr = await get_best_match(params, asynch_response, str(status_code))
return mmr
except Exception as ex:
LOGGER.error(f"Error in get valuation and error was {ex}")
return ex

async def wrap_get_fuzzy_match(func, *args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as err:
LOGGER.error(f"Error in wrap_get_fuzzy_match and error was {err}")
return err
async def main(headers, file):
tasks = []
sema = asyncio.Semaphore(500)
BATCH_SIZE = 1000000
async with ClientSession() as session:
with open(file) as f:
while True:
batch = [line.strip('n') for line in islice(f, BATCH_SIZE)]
if not batch:
break
for param in batch:
task = asyncio.ensure_future(wrap_get_fuzzy_match(
get_valuation,
url= API + param,
params=param,
api_header=headers,
session=session,
semaphore=sema,
))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses

我通过在块中传递数据并在循环中调用main函数来解决这个问题。

async def get_valuation(url, params, api_header, session, semaphore):
"""
Call fuzzy match api
:param url:
:param api_header:
:param session:
:param semaphore:
:return:
"""
async with semaphore:
async with session.get(url, headers=api_header) as response:
status_code = response.status
try:
if status_code != 200:
mmr = {params: 'not found' + ',' + ' ' + str(status_code)}
else:
asynch_response = await response.json()
mmr = await get_best_match(params, asynch_response, str(status_code))
return mmr
except Exception as ex:
LOGGER.error(f"Error in get valuation and error was {ex}")
return ex

async def wrap_get_fuzzy_match(func, *args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as err:
LOGGER.error(f"Error in wrap_get_fuzzy_match and error was {err}")
return err

async def main(params, headers):
tasks = []
sema = asyncio.Semaphore(100)
async with ClientSession() as session:
async with timeout(None):
LOGGER.info(f"Number of urls to process: {len(tasks)}")
for param in params:
task = asyncio.ensure_future(wrap_get_fuzzy_match(
get_valuation,
url=API,
params=param,
api_header=headers,
session=session,
semaphore=sema,
))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses

if __name__ == '__main__':
LOGGER.info("Start Processing")
BATCH_SIZE = <size of each batch>
loop = asyncio.get_event_loop()
try:
with open(INPUT) as file:
inputs = file.readlines()
except IOError:
LOGGER.exception("Unable to read valuation input file")
raise
chunked_list = list(divide_chunks(big_list=inputs, chunk_size=BATCH_SIZE))
LOGGER.info(
f"Chunked size- {len(chunked_list)}"
)
batch_counter = 0
for params in chunked_list:
batch_counter += 1
LOGGER.info(
f"Starting batch number [{batch_counter}] out of [{len(chunked_list)}] "
)
results = loop.run_until_complete(
asyncio.ensure_future(
main(params= params,headers=hdr)
)
)

LOGGER.info("Processing Completed!!")
loop.close()

最新更新