我遇到了一些我无法理解的 Python 多处理行为......
例如:
from multiprocessing import Pool
import time
import sys
def f(x):
time.sleep(10)
print(x)
return x * x
def f2(x, f):
time.sleep(10)
print(x, file=f)
return x * x
if __name__ == '__main__':
p = Pool(5)
for t in range(10):
p.apply_async(f, args=(t,))
p.close()
p.join() # Here it blocks and prints the number, which is normal.
p = Pool(5)
for t in range(10):
p.apply_async(f2, args=(t, sys.stdout))
p.close()
p.join() # Here it does not block and nothing happends(no output at all)...
输出为:
3
1
0
2
4
5
9
6
7
8
我知道在使用多处理和apply_async
时,我们必须使用共享变量之类的东西传递给函数,但是如果我将普通变量传递给apply_async
中使用的函数会发生什么?
multiprocessing.Pool
在单独的进程中执行您的逻辑。如果逻辑引发并发生异常,Pool
会将其返回给调用方。
在您的代码中,您没有收集函数的输出,因此您不会注意到真正的问题。
尝试按如下方式修改代码:
p = Pool(5)
for t in range(10):
task = p.apply_async(f2, args=(t, sys.stdout))
task.get()
然后,您将获得在f2
中引发的实际异常:
Traceback (most recent call last):
File "asd.py", line 24, in <module>
p.apply_async(f2, args=(t, sys.stdout)).get()
File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get
raise self._value
File "/usr/lib/python3.5/multiprocessing/pool.py", line 385, in _handle_tasks
put(task)
File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
事实证明,sys.stdout
是不可挑的。在这种情况下,这不是一个问题,因为每个进程sys.stdout
唯一的。您可以避免将其传递到函数上,而只需在f2
中按原样使用它。