多处理.如果子线程导致分段错误,池将挂起



我想使用multiprocessing.Pool并行应用一个函数。问题是,如果一个函数调用触发分段错误,池将永远挂起。有没有人知道我如何做一个池,检测什么时候发生这样的事情,并引发一个错误?

下面的例子展示了如何复制它(需要scikit-learn> 0.14)

import numpy as np
from sklearn.ensemble import gradient_boosting
import time
from multiprocessing import Pool
class Bad(object):
    tree_ = None

def fit_one(i):
    if i == 3:
        # this will segfault                                                    
        bad = np.array([[Bad()] * 2], dtype=np.object)
        gradient_boosting.predict_stages(bad,
                                         np.random.rand(20, 2).astype(np.float32),
                                         1.0, np.random.rand(20, 2))
    else:
        time.sleep(1)
    return i

pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
    print o

如评论中所述,如果您使用concurrent.Futures.ProcessPoolExecutor而不是multiprocessing.Pool,则此操作仅在Python 3中有效。

如果你被困在Python 2上,我发现最好的选择是在Pool.apply_asyncPool.map_async返回的结果对象上使用timeout参数。例如:

pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
    print o.get(timeout=1000)  # allow 1000 seconds max

只要有子进程完成任务所需时间的上限,就可以使用

这是Python中的一个已知错误,issue #22393。没有任何有意义的解决方案,只要你使用multiprocessing.pool,直到它被修复。该链接提供了一个补丁,但它还没有集成到主版本中,所以Python的稳定版本还没有修复这个问题。

与其使用Pool().imap(),不如自己用Process()手动创建子进程。我打赌返回的对象可以让你得到任何孩子的生活状态。你会知道他们是否挂断了。

我还没有运行您的示例,看看它是否可以处理错误,但尝试并发期货。只需将my_function(i)替换为fit_one(i)。保持__name__=='__main__':结构。并行期货似乎需要这个。下面的代码是在我的机器上测试的,所以希望能在你的机器上直接工作。

import concurrent.futures
def my_function(i):
    print('function running')
    return i
def run():
    number_processes=4
    executor = concurrent.futures.ProcessPoolExecutor(number_processes)
    futures = [executor.submit(my_function,i) for i in range(10)]
    concurrent.futures.wait(futures)
    for f in futures:
        print(f.result())
if __name__ == '__main__':
    run()

相关内容

  • 没有找到相关文章