为什么使用 "fork" 有效,但在 Python3.8+ "多处理"中使用"spawn"失败?



我在macOS上工作,最近被"叉子";至";"产卵";Python 3.8多处理中的更改(请参阅文档(。下面显示了一个简化的工作实例;叉子";成功,但使用";"产卵";失败。该代码的目的是创建一个自定义队列对象,该对象支持在macOS下调用size(),从而继承Queue对象并获取多处理的上下文。

import multiprocessing
from multiprocessing import Process
from multiprocessing.queues import Queue
from time import sleep

class Q(Queue):
def __init__(self):
super().__init__(ctx=multiprocessing.get_context())
self.size = 1
def call(self):
return print(self.size)

def foo(q):
q.call()

if __name__ == '__main__':
multiprocessing.set_start_method('spawn')  # this would fail
# multiprocessing.set_start_method('fork')  # this would succeed
q = Q()
p = Process(target=foo, args=(q,))
p.start()
p.join(timeout=1)

当使用";"产卵";如下所示。

Process Process-1:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/fanchen/Private/python_work/sandbox.py", line 23, in foo
q.call()
File "/Users/fanchen/Private/python_work/sandbox.py", line 19, in call
return print(self.size)
AttributeError: 'Q' object has no attribute 'size'

似乎子进程认为self.size对于代码执行来说不是必需的,所以它没有被复制。我的问题是为什么会发生这种情况?

在macOS Catalina 10.15.6、Python 3.8.5 下测试的代码片段

问题是派生的进程没有共享资源,因此要为每个进程正确地重新创建队列实例,需要添加序列化和反序列化方法。这是一个工作代码:

# Portable queue
# The idea of Victor Terron used in Lemon project (https://github.com/vterron/lemon/blob/master/util/queue.py).
# Pickling/unpickling methods are added to share Queue instance between processes correctly.
import multiprocessing
import multiprocessing.queues
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n = 0):
self.count = multiprocessing.Value('i', n)
def __getstate__(self):
return (self.count,)
def __setstate__(self, state):
(self.count,) = state
def increment(self, n = 1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
self._counter = SharedCounter(0)
def __getstate__(self):
return super().__getstate__() + (self._counter,)
def __setstate__(self, state):
super().__setstate__(state[:-1])
self._counter = state[-1]
def put(self, *args, **kwargs):
super().put(*args, **kwargs)
self._counter.increment(1)
def get(self, *args, **kwargs):
item = super().get(*args, **kwargs)
self._counter.increment(-1)
return item
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self._counter.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()

您也可以使用multiprocessing.manager.Queue

最新更新