我想使用multiprocessing.Pool并行应用一个函数。问题是,如果一个函数调用触发分段错误,池将永远挂起。有没有人知道我如何做一个池,检测什么时候发生这样的事情,并引发一个错误?
下面的例子展示了如何复制它(需要scikit-learn> 0.14)
import numpy as np
from sklearn.ensemble import gradient_boosting
import time
from multiprocessing import Pool
class Bad(object):
tree_ = None
def fit_one(i):
if i == 3:
# this will segfault
bad = np.array([[Bad()] * 2], dtype=np.object)
gradient_boosting.predict_stages(bad,
np.random.rand(20, 2).astype(np.float32),
1.0, np.random.rand(20, 2))
else:
time.sleep(1)
return i
pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
print o
如评论中所述,如果您使用concurrent.Futures.ProcessPoolExecutor
而不是multiprocessing.Pool
,则此操作仅在Python 3中有效。
如果你被困在Python 2上,我发现最好的选择是在Pool.apply_async
和Pool.map_async
返回的结果对象上使用timeout
参数。例如:
pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
print o.get(timeout=1000) # allow 1000 seconds max
只要有子进程完成任务所需时间的上限,就可以使用
这是Python中的一个已知错误,issue #22393。没有任何有意义的解决方案,只要你使用multiprocessing.pool
,直到它被修复。该链接提供了一个补丁,但它还没有集成到主版本中,所以Python的稳定版本还没有修复这个问题。
与其使用Pool().imap()
,不如自己用Process()
手动创建子进程。我打赌返回的对象可以让你得到任何孩子的生活状态。你会知道他们是否挂断了。
我还没有运行您的示例,看看它是否可以处理错误,但尝试并发期货。只需将my_function(i)替换为fit_one(i)。保持__name__=='__main__':
结构。并行期货似乎需要这个。下面的代码是在我的机器上测试的,所以希望能在你的机器上直接工作。
import concurrent.futures
def my_function(i):
print('function running')
return i
def run():
number_processes=4
executor = concurrent.futures.ProcessPoolExecutor(number_processes)
futures = [executor.submit(my_function,i) for i in range(10)]
concurrent.futures.wait(futures)
for f in futures:
print(f.result())
if __name__ == '__main__':
run()