放一个多进程.在多进程中排队.队列爆炸



以下代码抛出异常并在python 2.7和3.3中打印123。

from multiprocessing import Queue
class Pool(object):
    def __init__(self):
        self.q = Queue()
p = Pool()
p.q.put(p)
print(123)

实际上是一种竞态条件,如下图所示:

yuv@yuvpad2:~/$ python3.3 t.py
123
Traceback (most recent call last):
  File "/home/yuv/Downloads/Python-3.3.0/Lib/multiprocessing/queues.py", line 249, in _feed
yuv@yuvpad2:~/$ 

完整的错误是RuntimeError: Queue objects should only be shared between processes through inheritance,追溯根本没有解释它是如何/在哪里发生的。问题的根源在于Queue中的对象不能引用Queue。我的实际用例实际上是一个工作者对象和一个池对象,其中一个工作者向池的Queue报告它完成了工作。所以我想让工作线程将自己发送回工作线程Queue

我不使用Queue.Queue的原因,虽然多线程对我的情况很好,是因为在Python 2.7中有一个bug,使queue.get()忽略Ctrl-C,这很烦人。

是否有一种方法可以干净地完成这个模式?

真正的问题代码在codepad

报错的原因是:

p.q.put(p)

在这里,你试图将一个引用Queue的对象放入队列。队列是用来在进程之间通信的,它的工作方式是通过pickle你试图放入的任何东西,然后在另一个进程中取出它——但是pickle Queue是不可能的,甚至没有意义。

这就是为什么当你尝试pickle一个队列时,你会得到你提到的错误:

>>> from multiprocessing import Queue
>>> q = Queue()
>>> import pickle
>>> pickle.dumps(q)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.2/multiprocessing/queues.py", line 77, in __getstate__
    assert_spawning(self)
  File "/usr/lib/python3.2/multiprocessing/forking.py", line 51, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

如果你想使用Queue在进程之间共享数据,一种方法是这样做的:

class Worker(multiprocessing.Process):
    queue = multiprocessing.Queue()
    def run(self):
        print(self.queue.get())
        ...

更多示例请查看docs

我想有几种方法可以做到这一点,但在不知道您不想做什么的情况下,很难推荐一种方法。

我想最简单的方法是使用2个不同的队列。一个给进厂工人,一个给完工工人。

在代码中追溯没有显示问题的原因是multiprocessing.Queue类启动了一个后台线程,并且在该线程中生成了异常。我得到了下面的trace…

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
    assert_spawning(self)
  File "/usr/lib/python2.7/multiprocessing/forking.py", line 51, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

…我怀疑这是由…

p.q.put(p)

…其中,您似乎将包含Queue对象的Pool对象放入Queue中,这是不允许的,因此出现错误。

如果你想要一个有用的解决方案,这将有助于明确你想要达到的目标。

最新更新