多处理的 queue.get() 何时返回 DONE?



我正在学习Python多处理模块,我发现了这个例子:

from multiprocessing import Process, Queue
import time
def reader(queue):
    ## Read from the queue
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break
def writer(count, queue):
    ## Write to the queue
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')
if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        queue = Queue()   # reader() reads from queue
                          # writer() writes to queue
        reader_p = Process(target=reader, args=((queue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader() as a separate python process
        _start = time.time()
        writer(count, queue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print "Sending %s numbers to Queue() took %s seconds" % (count, 
            (time.time() - _start))

我想知道queue.get()何时会返回DONE,所以我尝试了以下示例:

#!/bin/env python
from multiprocessing import Process, Queue
import time
if __name__=='__main__':
  queue = Queue()
  print "Before 2x put"
  queue.put(10)
  queue.put(20)
  print "After 2x put"
  print "Before 1s get"
  print queue.get()
  print "After 1st get"
  print "Before 2nd get"
  print queue.get()
  print "After 2nd get"
  print "Before 3rd get"
  print queue.get()
  print "After 3rd get"

这个脚本的最后一个消息是Before 3rd get,在此之后,脚本卡住了,结束它的唯一方法就是终止它。从这个例子中,您可以看到queue.get()正在阻塞(代码在结束之前不会继续)。当这种情况发生时,如何可能在原始代码中queue.get()返回DONE ?

编辑

回复@KemyLand很好地解释了这里发生的事情,这是没有卡住的版本:

#!/bin/env python
from multiprocessing import Process, Queue
import time
if __name__=='__main__':
  queue = Queue()
  print "Before 2x put"
  queue.put(10)
  queue.put(20)
  print "After 2x put"
  print "Before 1s get"
  print queue.get()
  print "After 1st get"
  print "Before 2nd get"
  print queue.get()
  print "After 2nd get"
  print "Before 3rd get"
  try:
    print queue.get_nowait()
    print "After 3rd get"
  except:
    pass

这很简单。

在你的第一个代码中,readerwriter之间达成的"协议"是 writerreader发送任意数量的数据,然后writer发送'DONE', reader接收它并理解数据传输完成。

在你的第二个代码中,readerwriter 之间没有一致的协议,因为作者的观点是"我发送两个对象,我完成了!",而读者的观点是"*我收到三个对象,我完成了!"。

因为运行时环境的任何部分都无法检测何时发生协议错误,所以应用程序只是阻塞,等待永远不会出现的数据。唯一能够检测到这种情况的是应用程序本身,因为它是唯一知道它所遵循的协议的应用程序。出于这个目的,您可以使用Queue.Queue.get_nowait() (必须 import Queue(大写Q),因为multiprocessing.Queue只是Queue.Queue的别名)。如果这样的函数不能立即从Queue中提取对象,它将抛出Queue.Empty异常。(注意:这个混乱的模块名称在Python 3中已经修复了)。

最新更新