如何从类启动和停止多个子进程?



Python程序:

import multiprocessing
import time

class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker)
for _ in range(multiprocessing.cpu_count())]
def _worker(self):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()

if __name__ == '__main__':
application = Application()
application.start()
time.sleep(3)
application.stop()

其输出:

starting
starting
Traceback (most recent call last):
File "/Users/maggyero/Desktop/application.py", line 31, in <module>
application.start()
File "/Users/maggyero/Desktop/application.py", line 21, in start
process.start()
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory

在函数Application.__init__中,每个调用multiprocessing.Process(target=self._worker)都使用实例方法self._worker作为其target参数来初始化一个multiprocessing.Process实例。self._worker绑定到实例属性self._processesself

在函数Application.start中,每个调用process.start()序列化target参数,因此self._processesself._processes是最初尚未启动的multiprocessing.Process实例的列表。第一个调用process.start()启动该列表中的第一个multiprocessing.Process实例,没有问题,但第二个调用process.start()失败。

因此,无法序列化已启动的multiprocessing.Process实例。如何解决这个问题?

问题的根源在于multiprocessing.Process实例的start方法将其_popen实例属性设置为multiprocessing.popen_*.Popen实例。该实例的初始化执行以下两个步骤(以及其他步骤):

  1. 对于multiprocessing.popen_spawn_posix.Popen实例、multiprocessing.popen_spawn_win32.Popen实例或multiprocessing.popen_forkserver.Popen实例,但不是multiprocessing.popen_fork.Popen实例(即对于启动方法'spawn'或启动方法'forkserver'但不是启动方法'fork'),它会序列化multiprocessing.Process实例,以将其写入父进程用于与子进程通信的管道末尾,以便子进程可以执行runmultiprocessing.Process实例的方法。

  2. 它将其finalizer实例属性设置为multiprocessing.util.Finalize实例,该实例本身将其_weakref实例属性设置为weakref.ref实例,以便在解释器退出父进程用于与子进程通信的管道末端时关闭。换句话说,它使multiprocessing.Process实例持有弱引用。

因此,如果一个multiprocessing.Process实例持有对已启动multiprocessing.Process实例的引用,那么它持有一个弱引用(第 2 点),因此启动它将失败,因为它将序列化(第 1 点),弱引用和弱引用不可序列化:

import multiprocessing
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')  # or 'forkserver' but not 'fork'
process_a = multiprocessing.Process()
process_b = multiprocessing.Process()
process_b.foo = process_a
process_a.start()  # creates process_a._popen.finalizer._weakref
process_b.start()  # TypeError: cannot pickle 'weakref' object

显示序列化问题的最小 Python 程序:

import pickle
import weakref
pickle.dumps(weakref.ref(int))  # TypeError: cannot pickle 'weakref' object

避免序列化问题的两种解决方法:

  • 要么使target参数成为classmethod,以便它不绑定到self(特别是实例属性self._processes):
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker, args=(self._event,))
for _ in range(multiprocessing.cpu_count())]
@classmethod
def _worker(self, event):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
  • 或者使用__getstate__target参数的序列化中专门排除实例属性self._processes
class Application:
def __init__(self):
self._event = multiprocessing.Event()
self._processes = [
multiprocessing.Process(target=self._worker)
for _ in range(multiprocessing.cpu_count())]
def _worker(self):
while not self._event.is_set():
print(multiprocessing.current_process().name)
time.sleep(1)
def start(self):
for process in self._processes:
print('starting')
process.start()
def stop(self):
self._event.set()
for process in self._processes:
process.join()
def __getstate__(self):
state = {}
for key, value in vars(self).items():
if key != '_processes':
state[key] = value
return state

相关内容

  • 没有找到相关文章

最新更新