如何从多处理队列继承



使用以下代码,似乎传递给工作线程的队列实例未初始化:

from multiprocessing import Process
from multiprocessing.queues import Queue
class MyQueue(Queue):
    def __init__(self, name):
        Queue.__init__(self)
        self.name = name
def worker(queue):
    print queue.name
if __name__ == "__main__":
    queue = MyQueue("My Queue")
    p = Process(target=worker, args=(queue,))
    p.start()
    p.join()

这将引发:

... line 14, in worker
    print queue.name
AttributeError: 'MyQueue' object has no attribute 'name'

我无法重新初始化队列,因为我会丢失 queue.name 的原始值,甚至将队列的名称作为参数传递给worker(这应该有效,但这不是一个干净的解决方案)。

那么,如何从multiprocessing.queues.Queue继承而不会收到此错误?

在 POSIX 上,Queue对象通过简单继承共享到子进程。

在Windows上,这是不可能的,所以它必须腌制Queue,通过管道将其发送给孩子,然后解开它。

(这可能并不明显,因为如果您实际上尝试腌制Queue,您会得到一个例外,RuntimeError: MyQueue objects should only be shared between processes through inheritance。如果你浏览源代码,你会发现这真的是一个谎言——只有当你试图腌制一个Queue时,它才会引发这个异常,而multiprocess不在生成子进程的过程中。

当然,通用的酸

洗和脱酸洗不会有任何好处,因为您最终会得到两个相同的队列,而不是两个进程中的相同队列。因此,multiprocessing通过添加对象在脱洗时使用的register_after_fork机制来扩展内容。 如果您查看Queue的源代码,您可以看到它是如何工作的。

但是你真的不需要知道它是如何工作的钩子;你可以像任何其他类的酸洗一样钩住它。例如,这应该有效:***

def __getstate__(self):
    return self.name, super(MyQueue, self).__getstate__()
def __setstate__(self, state):
    self.name, state = state
    super(MyQueue, self).__setstate__(state)

有关更多详细信息,pickle文档解释了影响类腌制方式的不同方式。

(如果它不起作用,并且我没有犯一个愚蠢的错误......那么你至少必须知道一点关于它是如何工作的......但很可能只是为了弄清楚是在_after_fork()之前还是之后做额外的工作,这只需要交换最后两行......


* 我不确定它是否真的保证在 POSIX 平台上使用简单的分叉继承。这恰好在 2.7 和 3.3 上是正确的。但是有一个分支multiprocessing在所有平台上使用Windows风格的pickle-everything以保持一致性,另一个分支在OS X上使用混合体,允许在单线程模式下使用CoreFoundation,或者类似的东西,这显然是可行的。

**实际上,我认为Queue只是为了方便起见而使用register_after_fork,如果没有它就可以重写......但这取决于Pipe在Windows上的_after_forkLock和POSIX上的BoundedSemaphore中所做的魔力。

这才是正确的,因为我碰巧从阅读源代码中知道Queue是一个新风格的类,不会覆盖__reduce____reduce_ex,并且永远不会从__getstate__返回错误值。如果你不知道这一点,你将不得不编写更多的代码。

最新更新