我已经看了这个问题,并与chatGPT/bing (lol)进行了交谈,但仍然无法解决这个问题。
我试图在一台有32个cpu的机器上执行~7mn个请求到API,将数据加载到postgres。我的代码基本上是这样设置的:
from aiomultiprocess import Pool
CPUS = 30 #(32 CPUS available)
batch_size = 5000
for i in range(0, len(args_list), batch_size):
log.info(f"<log message>")
async with Pool(
processes=CPUS,
maxtasksperchild=100,
childconcurrency=3,
queuecount=int(CPUS / 3),
) as pool:
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
——编辑:在评论中为每个请求添加fetch_api_results
的编校。它基本上是一组函数,它们构造api url,然后递归地发出aiohttp请求,直到api请求结果中没有更多的next_url
令牌。
给你。
from aiohttp import request
async def fetch_api_results(*args)
try:
result_objects= APIPaginator(*args)
await result_objects.fetch()
log.info("uploading data")
#upload to db function
except planned_exceptions as e:
log.warning(e, exc_info=False)
class APIPaginator(object):
async def query_data(self):
url = self.api_base + "<str from arg>"
payload = {"limit": 1000}
await self.query_all(url, payload)
async def query_all(self, url, payload):
try:
async with request(method="GET", url=url, params=payload) as response:
log.info(f"status code: {response.status}")
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get("next_url")
if next_url:
await self.query_all(next_url)
else:
response.raise_for_status()
except: #(except block omitted)
async def fetch(self):
await self.query_data()
编辑结束--------------它会运行一两个小时(预计需要一到两天),然后会冻结。没有抛出错误。当我键盘中断它,我将看到OSError: [Errno 24] Too many open files
错误。
我已经把回溯放在下面了。
从我的理解来看,似乎太多文件处理程序被打开的问题与池中产生新的工作进程有关。让我困惑的是,文档说maxtasksperchild限制,当达到时,应该导致一个旧的工作进程被杀死,并产生一个新的。这是为了防止内存泄漏,我认为,防止这个问题的发生。
但是,改变maxtasksperchild参数不会产生任何变化。
此外,我实现了批处理,以有效地终止池并在每5000个任务之后启动一个新池,以防止file_handlers的积累。一旦with块关闭,with pool:
实现应该有效地终止与该池相关的所有操作。但这也失败了。实施批处理方法后没有变化。
这一切都让我很困惑。很明显,它与新生成的进程的管道有关,但我不确定该怎么做。欢迎提供任何反馈。
一个短期的修复,只是延长时间量,我有之前的脚本失败可能是增加可能的文件打开的最大数量(根据链接的答案,使用ulimit -n
)。但是我担心这个值也会被超过,因为这将是一个相当长的运行任务。
任何帮助都是非常感激的!
这是完整的回溯:
File "<path-to-file>.py", line 127, in import_data
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 136, in results
return await self.pool.results(self.task_ids)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 312, in results
await asyncio.sleep(0.005)
File "/<path-to-env>/lib/python3.11/asyncio/tasks.py", line 639, in sleep
return await future
File "/<path-to-env>/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/<path-to-file>/main.py", line 39, in add_all_data
await import_data(args)
File "/<path-to-file>/orchestrator.py", line 120, in import_data
async with Pool(
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 196, in __aexit__
await self.join()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 379, in join
await self._loop
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 229, in loop
self.processes[self.create_worker(qid)] = qid
^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 261, in create_worker
process.start()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/core.py", line 153, in start
return self.aio_process.start()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/home/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 58, in _launch
self.pid = util.spawnv_passfds(spawn.get_executable(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/util.py", line 451, in spawnv_passfds
errpipe_read, errpipe_write = os.pipe()
^^^^^^^^^
OSError: [Errno 24] Too many open files
将递归调用移出async with request...
块获取下一个URL
在基本请求中使用的任何资源都不会被释放,直到所有内容都返回原来的方式:
def query_all(self, ...):
try:
async with request(method="GET", url=url, params=payload) as response:
log.info(f"status code: {response.status}")
next_url = None
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get("next_url")
else:
response.raise_for_status()
except ...:
...
if next_url:
await self.query_all(next_url)
如果问题仍然存在,则将对query_all
的嵌套调用更改为其自己的任务,但在实例中使用FIFO队列,以便query_data
的调用等待对query_all
的所有子调用。
这应该会改善那里的情况——没有可复制的例子,很难确定。