当我尝试多处理池模块时,我注意到当我加载/打开任何类型的文件时它不起作用。下面的代码按预期工作。当我取消注释第 8-9 行时,脚本会跳过pool.apply_async
方法,并且loopingTest
永远不会运行。
import time
from multiprocessing import Pool
class MultiClass:
def __init__(self):
file = 'test.txt'
# with open(file, 'r') as f: # This is the culprit
# self.d = f
self.n = 50000000
self.cases = ['1st time', '2nd time']
self.multiProc(self.cases)
print("It's done")
def loopingTest(self, cases):
print(f"looping start for {cases}")
n = self.n
while n > 0:
n -= 1
print(f"looping done for {cases}")
def multiProc(self, cases):
test = False
pool = Pool(processes=2)
if not test:
for i in cases:
pool.apply_async(self.loopingTest, (i,))
pool.close()
pool.join()
if __name__ == '__main__':
start = time.time()
w = MultiClass()
end = time.time()
print(f'Script finished in {end - start} seconds')
您会看到此行为,因为当您将文件描述符 (self.d
( 保存到实例时调用apply_async
失败。当你调用apply_async(self.loopingTest, ...)
时,Python 需要酸洗self.loopingTest
将其发送到工作进程,这也需要酸洗self
。将打开的文件描述符另存为self
的属性时,酸洗失败,因为无法酸取文件描述符。如果在示例代码中使用apply
而不是apply_async
,您将亲眼看到这一点。您会收到如下错误:
Traceback (most recent call last):
File "a.py", line 36, in <module>
w = MultiClass()
File "a.py", line 12, in __init__
self.multiProc(self.cases)
File "a.py", line 28, in multiProc
out.get()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/usr/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
put(task)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
您需要更改代码,避免将文件描述符保存到self
,仅在 worker 方法中创建它(如果需要使用它(,或者使用 Python 提供的工具控制类的 pickle/unpickle 过程。 根据用例,您还可以将传递给apply_async
的方法转换为顶级函数, 这样self
根本不需要腌制。