让我们从冲压线开始:
✗ ~/.pyenv/versions/3.7.11/bin/python demo_multiprocessing.py
<Process(Process-6, started)> is putting 4 in the list
<Process(Process-2, started)> is putting 0 in the list
<Process(Process-4, started)> is putting 2 in the list
<Process(Process-5, started)> is putting 3 in the list
<Process(Process-3, started)> is putting 1 in the list
<Process(Process-7, started)> is putting 5 in the list
all processes finished, here's what we have in the list:
[4, 0, 2, 3, 1, 5]
✗ ~/.pyenv/versions/3.9.11/bin/python demo_multiprocessing.py
<Process name='Process-4' parent=74535 started> is putting 2 in the list
<Process name='Process-5' parent=74535 started> is putting 3 in the list
<Process name='Process-6' parent=74535 started> is putting 4 in the list
<Process name='Process-2' parent=74535 started> is putting 0 in the list
<Process name='Process-7' parent=74535 started> is putting 5 in the list
<Process name='Process-3' parent=74535 started> is putting 1 in the list
all processes finished, here's what we have in the list:
[]
这是代码:
from multiprocessing import Process, current_process, Manager
from typing import List
from random import shuffle
class StreamInterface:
def write(self, x: int) -> None:
raise NotImplemented
class ListStream(StreamInterface):
def __init__(self, manager: Manager):
self._manager = manager
self.shared_list = self._manager.list()
def write(self, x : int):
self.shared_list.append(x)
def pop_all(self) -> List[int]:
tmp = list(self.shared_list)
self.shared_list[:] = []
return tmp
class NullStream(StreamInterface):
def write(self, x:int):
pass
n_processors = 6
def do_work(i):
print(current_process(), " is putting ", i, " in the list")
ShiftPlanner.list_stream.write(i)
class ShiftPlanner:
list_stream: StreamInterface = NullStream()
pass
if __name__ == "__main__":
with Manager() as manager:
x = ListStream(manager)
ShiftPlanner.list_stream = x
processes = [
Process(
target=do_work,
args=(i,)
)
for i in range(n_processors)
]
shuffle(processes)
for p in processes:
p.start()
for p in processes:
p.join()
print("all processes finished, here's what we have in the list:")
print(x.pop_all())
ShiftPlanner.list_stream = NullStream()
发生了什么事?提示:类变量的处理方式似乎发生了变化。如果我在";写";方法,我看到子流程认为self.list_stream
是默认值NullStream((。但我很感激更多关于变化的细节,以及解决这个问题的最优雅、最惯用的方法是什么——谢谢!
以这种方式更改主进程的类属性不会反映在启动的子进程中(当启动方法设置为spawn
时(。这是因为if __name__...
子句中的代码只在主进程中运行,子进程重新导入主模块以复制其父模块的状态。因此,就孩子而言,类ShiftPlanner
仍然与最初定义的类相同。
一个简单的解决方案是将ListStream
对象传递给每个子进程,并让它们自己设置内存空间的类属性。但是,这样做将要求您不要将管理器存储在ListStream
中。
工作的示例代码:
from multiprocessing import Process, current_process, Manager
from typing import List
from random import shuffle
class StreamInterface:
def write(self, x: int) -> None:
raise NotImplemented
class ListStream(StreamInterface):
def __init__(self, manager: Manager):
self.shared_list = manager.list()
def write(self, x : int):
self.shared_list.append(x)
def pop_all(self) -> List[int]:
tmp = list(self.shared_list)
self.shared_list[:] = []
return tmp
class NullStream(StreamInterface):
def write(self, x:int):
pass
n_processors = 6
def do_work(i, x):
ShiftPlanner.list_stream = x
print(current_process(), " is putting ", i, " in the list")
ShiftPlanner.list_stream.write(i)
class ShiftPlanner:
list_stream: StreamInterface = NullStream()
pass
if __name__ == "__main__":
with Manager() as manager:
x = ListStream(manager)
ShiftPlanner.list_stream = x
processes = [
Process(
target=do_work,
args=(i, x)
)
for i in range(n_processors)
]
shuffle(processes)
for p in processes:
p.start()
for p in processes:
p.join()
print("all processes finished, here's what we have in the list:")
print(x.pop_all())
ShiftPlanner.list_stream = NullStream()
问题实际上与类变量无关,而是默认行为在MacOS:上从fork变为spawn
来自文档:">在版本3.8中更改:在macOS上,派生启动方法现在是默认的。fork-start方法应该被认为是不安全的,因为它可能导致子流程崩溃">
代码可以这样固定:
from typing import List
from random import shuffle
from multiprocessing import get_context
class StreamInterface:
def write(self, x: int) -> None:
raise NotImplemented
class ListStream(StreamInterface):
def __init__(self, manager: Manager):
self._manager = manager
self.shared_list = self._manager.list()
def write(self, x : int):
self.shared_list.append(x)
def pop_all(self) -> List[int]:
tmp = list(self.shared_list)
self.shared_list[:] = []
return tmp
class NullStream(StreamInterface):
def write(self, x:int):
pass
n_processors = 6
def do_work(i):
print(current_process(), " is putting ", i, " in the list")
ShiftPlanner.list_stream.write(i)
class ShiftPlanner:
list_stream: StreamInterface = NullStream()
pass
if __name__ == "__main__":
ctx = get_context('fork')
with Manager() as manager:
x = ListStream(manager)
ShiftPlanner.list_stream = x
processes = [
ctx.Process(
target=do_work,
args=(i,)
)
for i in range(n_processors)
]
shuffle(processes)
for p in processes:
p.start()
for p in processes:
p.join()
print("all processes finished, here's what we have in the list:")
print(x.pop_all())
ShiftPlanner.list_stream = NullStream()