我正在编写一个脚本,它读取一堆文件,然后并行处理所有这些文件中的行。
我的问题是,如果脚本无法打开某些文件,它的行为会很奇怪。如果它是列表中较晚的文件之一,那么它会处理较早的文件,并在到达坏文件时报告异常。但是,如果它无法打开列表中的第一个文件,那么它将不进行任何处理,也不会报告异常。
如何使脚本报告所有异常,无论它们在列表中的哪个位置?
关键问题似乎是pool.imap()
的块大小。如果异常发生在提交第一个区块之前,它将以静默方式失败。
这里有一个小脚本来重现这个问题:
from multiprocessing.pool import Pool
def prepare():
for i in range(5):
yield i+1
raise RuntimeError('foo')
def process(x):
return x
def test(chunk_size):
pool = Pool(10)
n = raised = None
try:
for n in pool.imap(process, prepare(), chunksize=chunk_size):
pass
except RuntimeError as ex:
raised = ex
print(chunk_size, n, raised)
def main():
print('chunksize n raised')
for chunk_size in range(1, 10):
test(chunk_size)
if __name__ == '__main__':
main()
prepare()
函数生成五个整数,然后引发一个异常。该生成器被传递给块大小从1到10的pool.imap()
。然后,它打印出块大小、接收到的结果数以及引发的任何异常。
chunksize n raised
1 5 foo
2 4 foo
3 3 foo
4 4 foo
5 5 foo
6 None None
7 None None
8 None None
9 None None
您可以看到异常被正确地报告,直到区块大小增加到足以在提交第一个区块之前发生异常为止。然后它会无声地失败,并且不会返回任何结果。
如果我在自己方便的系统上用Python 2.7.13和3.5.4运行这个(为了py2k和py3k的交叉兼容性,我稍微修改了一下),我会得到:
$ python2 --version
Python 2.7.13
$ python2 mptest.py
chunksize n raised
1 5 foo
2 4 foo
3 3 foo
4 4 foo
5 5 foo
6 None None
7 None None
8 None None
9 None None
$ python3 --version
Python 3.5.4
$ python3 mptest.py
chunksize n raised
1 5 foo
2 4 foo
3 3 foo
4 4 foo
5 5 foo
6 None foo
7 None foo
8 None foo
9 None foo
我认为,对于块大小>5,它会失败(因此打印None
)这一事实并不奇怪,因为没有一个池进程能够获得六个参数,因为通过调用mptest
生成的生成器只能被调用5次。
令人惊讶的是,Python2.7.9对大于5的块大小的异常表示None
,而Python3.5对异常表示foo
。
这是在提交794623bdb2中修复的问题#28699。该修复程序显然已被后移植到Python 3.5.4,但没有移植到Python 2.7.9,也没有移植到您自己的Python 3版本。