python中的类变量和多处理-发生了什么变化



让我们从冲压线开始:

✗ ~/.pyenv/versions/3.7.11/bin/python demo_multiprocessing.py
<Process(Process-6, started)>  is putting  4  in the list
<Process(Process-2, started)>  is putting  0  in the list
<Process(Process-4, started)>  is putting  2  in the list
<Process(Process-5, started)>  is putting  3  in the list
<Process(Process-3, started)>  is putting  1  in the list
<Process(Process-7, started)>  is putting  5  in the list
all processes finished, here's what we have in the list:
[4, 0, 2, 3, 1, 5]
✗ ~/.pyenv/versions/3.9.11/bin/python demo_multiprocessing.py
<Process name='Process-4' parent=74535 started>  is putting  2  in the list
<Process name='Process-5' parent=74535 started>  is putting  3  in the list
<Process name='Process-6' parent=74535 started>  is putting  4  in the list
<Process name='Process-2' parent=74535 started>  is putting  0  in the list
<Process name='Process-7' parent=74535 started>  is putting  5  in the list
<Process name='Process-3' parent=74535 started>  is putting  1  in the list
all processes finished, here's what we have in the list:
[]

这是代码:

from multiprocessing import Process, current_process, Manager
from typing import List
from random import shuffle
class StreamInterface:
def write(self, x: int) -> None:
raise NotImplemented
class ListStream(StreamInterface):
def __init__(self, manager: Manager):
self._manager = manager
self.shared_list = self._manager.list()
def write(self, x : int):
self.shared_list.append(x)
def pop_all(self) -> List[int]:
tmp = list(self.shared_list)
self.shared_list[:] = []
return tmp

class NullStream(StreamInterface):
def write(self, x:int):
pass

n_processors = 6
def do_work(i):
print(current_process(), " is putting ", i, " in the list")
ShiftPlanner.list_stream.write(i)
class ShiftPlanner:
list_stream: StreamInterface = NullStream()
pass

if __name__ == "__main__":
with Manager() as manager:
x = ListStream(manager)
ShiftPlanner.list_stream = x
processes = [
Process(
target=do_work,
args=(i,)
)
for i in range(n_processors)
]
shuffle(processes)
for p in processes:
p.start()
for p in processes:
p.join()
print("all processes finished, here's what we have in the list:")
print(x.pop_all())
ShiftPlanner.list_stream = NullStream()

发生了什么事?提示:类变量的处理方式似乎发生了变化。如果我在";写";方法,我看到子流程认为self.list_stream是默认值NullStream((。但我很感激更多关于变化的细节,以及解决这个问题的最优雅、最惯用的方法是什么——谢谢!

以这种方式更改主进程的类属性不会反映在启动的子进程中(当启动方法设置为spawn时(。这是因为if __name__...子句中的代码只在主进程中运行,子进程重新导入主模块以复制其父模块的状态。因此,就孩子而言,类ShiftPlanner仍然与最初定义的类相同。

一个简单的解决方案是将ListStream对象传递给每个子进程,并让它们自己设置内存空间的类属性。但是,这样做将要求您不要将管理器存储在ListStream中。

工作的示例代码:

from multiprocessing import Process, current_process, Manager
from typing import List
from random import shuffle
class StreamInterface:
def write(self, x: int) -> None:
raise NotImplemented
class ListStream(StreamInterface):
def __init__(self, manager: Manager):
self.shared_list = manager.list()
def write(self, x : int):
self.shared_list.append(x)
def pop_all(self) -> List[int]:
tmp = list(self.shared_list)
self.shared_list[:] = []
return tmp

class NullStream(StreamInterface):
def write(self, x:int):
pass

n_processors = 6
def do_work(i, x):
ShiftPlanner.list_stream = x
print(current_process(), " is putting ", i, " in the list")
ShiftPlanner.list_stream.write(i)
class ShiftPlanner:
list_stream: StreamInterface = NullStream()
pass

if __name__ == "__main__":
with Manager() as manager:
x = ListStream(manager)
ShiftPlanner.list_stream = x
processes = [
Process(
target=do_work,
args=(i, x)
)
for i in range(n_processors)
]
shuffle(processes)
for p in processes:
p.start()
for p in processes:
p.join()
print("all processes finished, here's what we have in the list:")
print(x.pop_all())
ShiftPlanner.list_stream = NullStream()

问题实际上与类变量无关,而是默认行为在MacOS:上从fork变为spawn

来自文档:">在版本3.8中更改:在macOS上,派生启动方法现在是默认的。fork-start方法应该被认为是不安全的,因为它可能导致子流程崩溃">

代码可以这样固定:

from typing import List
from random import shuffle
from multiprocessing import get_context
class StreamInterface:
def write(self, x: int) -> None:
raise NotImplemented
class ListStream(StreamInterface):
def __init__(self, manager: Manager):
self._manager = manager
self.shared_list = self._manager.list()
def write(self, x : int):
self.shared_list.append(x)
def pop_all(self) -> List[int]:
tmp = list(self.shared_list)
self.shared_list[:] = []
return tmp

class NullStream(StreamInterface):
def write(self, x:int):
pass

n_processors = 6
def do_work(i):
print(current_process(), " is putting ", i, " in the list")
ShiftPlanner.list_stream.write(i)
class ShiftPlanner:
list_stream: StreamInterface = NullStream()
pass
if __name__ == "__main__":
ctx = get_context('fork')
with Manager() as manager:
x = ListStream(manager)
ShiftPlanner.list_stream = x
processes = [
ctx.Process(
target=do_work,
args=(i,)
)
for i in range(n_processors)
]
shuffle(processes)
for p in processes:
p.start()
for p in processes:
p.join()
print("all processes finished, here's what we have in the list:")
print(x.pop_all())
ShiftPlanner.list_stream = NullStream()

相关内容

最新更新