我有一个Python函数,它调用C函数的包装器(我无法更改)。大多数时候,C 函数非常快,但是当它失败时,调用就会永远挂起。为了缓解这种情况,我使用 multiprocessing
使调用超时:
pool = multiprocessing.Pool(processes=4)
try:
res = pool.apply_async(my_dangerous_cpp_function, args=(bunch, of, vars))
return res.get(timeout=1.)
except multiprocessing.TimeoutError:
terminate_pool(pool)
pool = multiprocessing.Pool(processes=4)
当被调用的函数不回答任何信号时,如何终止池?
如果我用pool.terminate()
替换terminate_pool(pool)
,那么对pool.terminate()
的调用也会挂起。相反,我目前正在将 SIGKILL 发送到所有子进程:
def terminate_pool(pool):
for p in pool._pool:
os.kill(p.pid, 9)
pool.close() # ok, doesn't hang
#pool.join() # not ok, hangs forever
这样,挂起的子进程就不会消耗 100% 的 CPU,但是我不能调用pool.terminate()
或pool.join()
(它们挂起),所以我只是留下池对象并创建一个新的对象。即使他们收到了 SIGKILL,子进程仍然打开,所以我的 Python 进程数量从未停止增加......
有没有办法一劳永逸地消灭池及其所有子进程?
标准multiprocessing.Pool
不是为处理工作线程超时而设计的。
卵石处理池支持超时任务。
from pebble import process, TimeoutError
with process.Pool() as pool:
task = pool.schedule(function, args=[1,2], timeout=5)
try:
result = task.get()
except TimeoutError:
print "Task: %s took more than 5 seconds to complete" % task