在 Python 中使用multiprocessing.pool
中的ThreadPool
时,我有时会遇到 RuntimeError(我会说不超过 1%(。
我读过,如果尝试打开数百个线程,就会发生这种情况。就我而言,它应该是最多 4 个线程,所以我有点困惑为什么会发生这种情况。
我以前一直在完全相同的环境中使用ThreadPool
3 个线程,但从未出现过错误。
我的代码是:
import time
from multiprocessing.pool import ThreadPool
while True:
qty_fetched = 6
time.sleep(random_secs(0.5))
pending_updates = fetch_pending_updates(qty_fetched) #list of dicts
if pending_updates:
prio = pending_updates[0]['prio'] #variable number between 0 and 4 (edited from original question)
if prio > 3:
qty_threads = 1
elif prio == 0 or prio == 1:
qty_threads = 4
else:
qty_threads = 3
pool = ThreadPool(qty_threads)
pool.map(self.run_update_NEW, pending_updates) #a list of 6 dicts will be given to the pool of 1, 3 or 4 threads
else:
time.sleep(2)
和回溯:
...
pool = ThreadPool(qty_threads)
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 789, in __init__
Pool.__init__(self, processes, initializer, initargs)
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 192, in __init__
self._task_handler.start()
File "/app/.heroku/python/lib/python3.6/threading.py", line 846, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
有什么问题的想法吗?
尝试:
从这里我了解了ThreadPoolExecutor
.
我决定试一试:
import time
from concurrent.futures import ThreadPoolExecutor
while True:
qty_fetched = 6
time.sleep(random_secs(0.5))
pending_updates = fetch_pending_updates(qty_fetched) #list of dicts
if pending_updates:
prio = 2 #some variable number between 0 and 4
if prio > 3:
qty_threads = 1
elif prio == 0 or prio == 1:
qty_threads = 4
else:
qty_threads = 3
#the following lines changed
with ThreadPoolExecutor(max_workers=qty_threads) as e:
for pu in pending_updates:
e.submit(self.run_update_NEW, pu)
else:
time.sleep(2)
我将不断更新帖子,解释这是否有效。
我可以在您的代码中看到的一个问题是,您有一个无限while True
循环,您可以在其中创建池,但从未真正关闭它。您现在继续创建池,但由于您永远不会关闭并加入池,因此"旧"线程很可能会挂在那里,几分之一秒后您创建了更多线程。我的猜测是你最终会耗尽你的资源,并在某个地方达到进程或内核限制。
我会将池创建移到 while 循环之外,并继续在您的循环中使用相同的池。这就是池的整个想法 - 让进程或线程等待工作出现,在启动重复性任务时消除进程/线程创建开销。
如果有理由重新启动池(我不知道那可能是什么 - 如果您需要偶尔更新您的工作人员,您可以在池声明中使用maxtasksperchild
(,那么至少正确关闭旧池,因为您不会再给它提供任何工作。