获取Oserror:在使用多处理的代码上运行Longish Test Suite时,打开文件太多



运行Python 2.7,开发环境是OS X,但生产是Linux。

我有一些代码正在尝试通过多处理加速,并且我的工作正常,并观察到了所需的理论加速。然后,我去了上面的测试套件,经过几次测试,开始在所有后续测试中获取上述Oserror。如果我从开始遇到错误的程度运行测试,则其中一些数量通过,然后再次获得该错误。这是相当合乎逻辑的,只是一项理智检查。

试图找出出了什么问题,我用打印的__builtin__openclose呼叫(以下建议(以下建议)

import __builtin__
import traceback
import sys
openfiles = set()
oldfile = __builtin__.file
class newfile(oldfile):
    def __init__(self, *args):
        self.x = args[0]
        print "### OPENING %s ###" % str(self.x)
        traceback.print_stack(limit=20)
        print
        sys.stdout.flush()
        oldfile.__init__(self, *args)
        openfiles.add(self)
    def close(self):
        print "### CLOSING %s ###" % str(self.x)
        oldfile.close(self)
        openfiles.remove(self)
oldopen = __builtin__.open
def newopen(*args):
    return newfile(*args)
__builtin__.file = newfile
__builtin__.open = newopen

我只看到数百行### OPENING /dev/null ###

当我为完成相同任务的代码做同样的事情时,但是没有多处理,我就没有这样的文件连接,因此可以理解多处理在这里是故障。traceback调用支持这一点,这表明罪魁祸首在这里:

  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 250, in _bootstrap
    sys.stdin = open(os.devnull)

在此处发布multiprocessing::process.py::_bootstrap函数的代码,以防万一它有用:

def _bootstrap(self):
    from . import util
    global _current_process
    try:
        self._children = set()
        self._counter = itertools.count(1)
        try:
            sys.stdin.close()
            sys.stdin = open(os.devnull)
        except (OSError, ValueError):
            pass
        _current_process = self
        util._finalizer_registry.clear()
        util._run_after_forkers()
        util.info('child process calling self.run()')
        try:
            self.run()
            exitcode = 0
        finally:
            util._exit_function()
    except SystemExit, e:
        if not e.args:
            exitcode = 1
        elif isinstance(e.args[0], int):
            exitcode = e.args[0]
        else:
            sys.stderr.write(str(e.args[0]) + 'n')
            sys.stderr.flush()
            exitcode = 1
    except:
        exitcode = 1
        import traceback
        sys.stderr.write('Process %s:n' % self.name)
        sys.stderr.flush()
        traceback.print_exc()
    util.info('process exiting with exitcode %d' % exitcode)
    return exitcode

和,对于它的价值,我正在使用看起来像这样的代码来调用多处理:

num_cpus = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=num_cpus)
num_per_job = len(input_data) / num_cpus + 1
chunks = [input_data[num_per_job*i:num_per_job*(i+1)] for i in range(num_cpus)]
# TODO: ^^^ make this a list of generators
data = pool.map(get_output_from_input, chunks)
return itertools.chain.from_iterable(data)

所以,问题:这是multiprocessing中的错误,还是我做的事情很大?我真的很欢迎借口,下周花挖掘multiprocessing代码并弄清楚它的工作原理,但是我很难说服更高的上升,这是我时间的有效利用。感谢任何经验帮助的人!

您需要关闭池以终止子进程并释放用于与他们通信的管道。使用contextlib.closing执行此操作,这样您就不必担心会跳过近距离的例外。closing将在with块末端关闭池,包括当它出现时。因此,您无需打电话给自己。

另外, Pool.map块块请求,因此您不必自己做。我删除了该代码,但是get_output_from_input签名可能不正确(每个输入项将调用一次,而不是一次输入项目列表),因此您可能需要进行一些修复。

import contextlib
num_cpus = multiprocessing.cpu_count()
with contextlib.closing(multiprocessing.Pool(processes=num_cpus)) as pool:
    data = pool.map(get_output_from_input, input_data)
    return itertools.chain.from_iterable(data)

最新更新