运行时错误:无法使用线程池启动新线程,并且只有少数线程



在 Python 中使用multiprocessing.pool中的ThreadPool时,我有时会遇到 RuntimeError(我会说不超过 1%(。

我读过,如果尝试打开数百个线程,就会发生这种情况。就我而言,它应该是最多 4 个线程,所以我有点困惑为什么会发生这种情况。

我以前一直在完全相同的环境中使用ThreadPool3 个线程,但从未出现过错误。

我的代码是:

import time
from multiprocessing.pool import ThreadPool
while True:
qty_fetched = 6
time.sleep(random_secs(0.5))
pending_updates = fetch_pending_updates(qty_fetched) #list of dicts
if pending_updates:
prio = pending_updates[0]['prio'] #variable number between 0 and 4 (edited from original question)
if prio > 3:
qty_threads = 1
elif prio == 0 or prio == 1:
qty_threads = 4
else:
qty_threads = 3
pool = ThreadPool(qty_threads)
pool.map(self.run_update_NEW, pending_updates) #a list of 6 dicts will be given to the pool of 1, 3 or 4 threads
else:
time.sleep(2)

和回溯:

...
pool = ThreadPool(qty_threads) 
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 789, in __init__ 
Pool.__init__(self, processes, initializer, initargs) 
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 192, in __init__ 
self._task_handler.start() 
File "/app/.heroku/python/lib/python3.6/threading.py", line 846, in start 
_start_new_thread(self._bootstrap, ()) 
RuntimeError: can't start new thread 

有什么问题的想法吗?


尝试:

从这里我了解了ThreadPoolExecutor.

我决定试一试:

import time
from concurrent.futures import ThreadPoolExecutor
while True:
qty_fetched = 6
time.sleep(random_secs(0.5))
pending_updates = fetch_pending_updates(qty_fetched) #list of dicts
if pending_updates:
prio = 2 #some variable number between 0 and 4
if prio > 3:
qty_threads = 1
elif prio == 0 or prio == 1:
qty_threads = 4
else:
qty_threads = 3
#the following lines changed
with ThreadPoolExecutor(max_workers=qty_threads) as e:
for pu in pending_updates:
e.submit(self.run_update_NEW, pu)
else:
time.sleep(2)

我将不断更新帖子,解释这是否有效。

我可以在您的代码中看到的一个问题是,您有一个无限while True循环,您可以在其中创建池,但从未真正关闭它。您现在继续创建池,但由于您永远不会关闭并加入池,因此"旧"线程很可能会挂在那里,几分之一秒后您创建了更多线程。我的猜测是你最终会耗尽你的资源,并在某个地方达到进程或内核限制。

我会将池创建移到 while 循环之外,并继续在您的循环中使用相同的池。这就是池的整个想法 - 让进程或线程等待工作出现,在启动重复性任务时消除进程/线程创建开销。

如果有理由重新启动池(我不知道那可能是什么 - 如果您需要偶尔更新您的工作人员,您可以在池声明中使用maxtasksperchild(,那么至少正确关闭旧池,因为您不会再给它提供任何工作。

相关内容

最新更新