Python程序:
import multiprocessing
import time
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker)
for _ in range(multiprocessing.cpu_count())]
def _worker(self):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
if __name__ == '__main__':
application = Application()
application.start()
time.sleep(3)
application.stop()
其输出:
starting
starting
Traceback (most recent call last):
File "/Users/maggyero/Desktop/application.py", line 31, in <module>
application.start()
File "/Users/maggyero/Desktop/application.py", line 21, in start
process.start()
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
在函数Application.__init__
中,每个调用multiprocessing.Process(target=self._worker)
都使用实例方法self._worker
作为其target
参数来初始化一个multiprocessing.Process
实例。self._worker
绑定到实例属性self._processes
的self
。
在函数Application.start
中,每个调用process.start()
序列化target
参数,因此self._processes
。self._processes
是最初尚未启动的multiprocessing.Process
实例的列表。第一个调用process.start()
启动该列表中的第一个multiprocessing.Process
实例,没有问题,但第二个调用process.start()
失败。
因此,无法序列化已启动的multiprocessing.Process
实例。如何解决这个问题?
问题的根源在于multiprocessing.Process
实例的start
方法将其_popen
实例属性设置为multiprocessing.popen_*.Popen
实例。该实例的初始化执行以下两个步骤(以及其他步骤):
-
对于
multiprocessing.popen_spawn_posix.Popen
实例、multiprocessing.popen_spawn_win32.Popen
实例或multiprocessing.popen_forkserver.Popen
实例,但不是multiprocessing.popen_fork.Popen
实例(即对于启动方法'spawn'
或启动方法'forkserver'
但不是启动方法'fork'
),它会序列化multiprocessing.Process
实例,以将其写入父进程用于与子进程通信的管道末尾,以便子进程可以执行run
multiprocessing.Process
实例的方法。 -
它将其
finalizer
实例属性设置为multiprocessing.util.Finalize
实例,该实例本身将其_weakref
实例属性设置为weakref.ref
实例,以便在解释器退出父进程用于与子进程通信的管道末端时关闭。换句话说,它使multiprocessing.Process
实例持有弱引用。
因此,如果一个multiprocessing.Process
实例持有对已启动multiprocessing.Process
实例的引用,那么它持有一个弱引用(第 2 点),因此启动它将失败,因为它将序列化(第 1 点),弱引用和弱引用不可序列化:
import multiprocessing
if __name__ == '__main__':
multiprocessing.set_start_method('spawn') # or 'forkserver' but not 'fork'
process_a = multiprocessing.Process()
process_b = multiprocessing.Process()
process_b.foo = process_a
process_a.start() # creates process_a._popen.finalizer._weakref
process_b.start() # TypeError: cannot pickle 'weakref' object
显示序列化问题的最小 Python 程序:
import pickle
import weakref
pickle.dumps(weakref.ref(int)) # TypeError: cannot pickle 'weakref' object
避免序列化问题的两种解决方法:
- 要么使
target
参数成为classmethod
,以便它不绑定到self
(特别是实例属性self._processes
):
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker, args=(self._event,))
for _ in range(multiprocessing.cpu_count())]
@classmethod
def _worker(self, event):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
- 或者使用
__getstate__
从target
参数的序列化中专门排除实例属性self._processes
:
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker)
for _ in range(multiprocessing.cpu_count())]
def _worker(self):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
def __getstate__(self):
state = {}
for key, value in vars(self).items():
if key != '_processes':
state[key] = value
return state