为什么multiprocessing queue .get()在队列关闭后阻塞?


from multiprocessing import Process, Queue
def worker_main(q):
while True:
message = q.get()

q = Queue()
worker = Process(target=worker_main, args=(q,))
worker.start()
q.close()
worker.join()

我期望worker_main()中对q.get的调用抛出异常并在q关闭后退出。相反,它会在主进程中队列关闭后挂起。

我的用例似乎与显示Queue.put在工作进程和Queue.get在主进程中的常见示例略有不同。

在主进程中,我正在生成需要通过队列分发到工作进程池的任务。然而,当任务完成时,我关闭队列以指示工作进程是时候退出了。

也许我不理解文档,但我认为很明显,将来对get的调用应该在close之后引发异常。

get([block[, timeout]])

从队列中移除并返回一个项目。如果可选参数blockTrue(默认值),timeoutNone(默认值),则在必要时阻塞,直到有项目可用。如果timeout是一个正数,它最多阻塞timeout秒,如果在此时间内没有项目可用,则引发queue.Empty异常。否则(block为False),如果有立即可用的项目,则返回一个项目,否则引发queue.Empty异常(在这种情况下timeout被忽略)。

在3.8版更改:如果队列关闭,则引发ValueError而不是OSError

multiprocessing.Queue表示共享缓冲区上的句柄。值得注意的是,当进程与队列连接时,每个进程都有自己的句柄副本。

调用.close()只关闭当前进程的读写句柄。

close ()

指示当前进程将不再向该队列中放入数据。[…]


为了优雅地为所有订阅者关闭队列,发送一个"关闭消息"。这通常是一个定义良好的哨兵对象,如None

from multiprocessing import Process, Queue
def worker_main(q):
while (message := q.get()) is not None:
print(message)

if __name__ == "__main__":
q = Queue()
q.put("Hello")
worker = Process(target=worker_main, args=(q,))
worker.start()
q.put("World")
q.put(None)  # close message
q.close()
worker.join()

最新更新