使用以下代码,似乎传递给工作线程的队列实例未初始化:
from multiprocessing import Process
from multiprocessing.queues import Queue
class MyQueue(Queue):
def __init__(self, name):
Queue.__init__(self)
self.name = name
def worker(queue):
print queue.name
if __name__ == "__main__":
queue = MyQueue("My Queue")
p = Process(target=worker, args=(queue,))
p.start()
p.join()
这将引发:
... line 14, in worker
print queue.name
AttributeError: 'MyQueue' object has no attribute 'name'
我无法重新初始化队列,因为我会丢失 queue.name 的原始值,甚至将队列的名称作为参数传递给worker(这应该有效,但这不是一个干净的解决方案)。
那么,如何从multiprocessing.queues.Queue继承而不会收到此错误?
在 POSIX 上,Queue
对象通过简单继承共享到子进程。
在Windows上,这是不可能的,所以它必须腌制Queue
,通过管道将其发送给孩子,然后解开它。
(这可能并不明显,因为如果您实际上尝试腌制Queue
,您会得到一个例外,RuntimeError: MyQueue objects should only be shared between processes through inheritance
。如果你浏览源代码,你会发现这真的是一个谎言——只有当你试图腌制一个Queue
时,它才会引发这个异常,而multiprocess
不在生成子进程的过程中。
洗和脱酸洗不会有任何好处,因为您最终会得到两个相同的队列,而不是两个进程中的相同队列。因此,multiprocessing
通过添加对象在脱洗时使用的register_after_fork
机制来扩展内容。 如果您查看Queue
的源代码,您可以看到它是如何工作的。
但是你真的不需要知道它是如何工作的钩子;你可以像任何其他类的酸洗一样钩住它。例如,这应该有效:***
def __getstate__(self):
return self.name, super(MyQueue, self).__getstate__()
def __setstate__(self, state):
self.name, state = state
super(MyQueue, self).__setstate__(state)
有关更多详细信息,pickle
文档解释了影响类腌制方式的不同方式。
(如果它不起作用,并且我没有犯一个愚蠢的错误......那么你至少必须知道一点关于它是如何工作的......但很可能只是为了弄清楚是在_after_fork()
之前还是之后做额外的工作,这只需要交换最后两行......
* 我不确定它是否真的保证在 POSIX 平台上使用简单的分叉继承。这恰好在 2.7 和 3.3 上是正确的。但是有一个分支multiprocessing
在所有平台上使用Windows风格的pickle-everything以保持一致性,另一个分支在OS X上使用混合体,允许在单线程模式下使用CoreFoundation
,或者类似的东西,这显然是可行的。
**实际上,我认为Queue
只是为了方便起见而使用register_after_fork
,如果没有它就可以重写......但这取决于Pipe
在Windows上的_after_fork
或Lock
和POSIX上的BoundedSemaphore
中所做的魔力。
这才是正确的,因为我碰巧从阅读源代码中知道Queue
是一个新风格的类,不会覆盖__reduce__
或__reduce_ex
,并且永远不会从__getstate__
返回错误值。如果你不知道这一点,你将不得不编写更多的代码。