如何使用多处理.Queue.get方法



下面的代码将三个数字放在一个队列中。然后它尝试从队列中取回数字。但从来没有。如何从队列中获取数据?

import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
print queue.get()

我最初在阅读@Martijn Pieters后删除了这个答案,因为他在更早的时候更详细地描述了"为什么这不起作用"。然后我意识到,OP示例中的用例不太符合的规范名称

"如何使用多访问.Queue.get方法"。

这不是因为没有涉及用于演示的子进程,但因为在实际应用程序中,几乎从来没有预先填充过队列,并且只在之后读取,但正在读取并且写入与其间的等待时间交错进行。Martijn展示的扩展演示代码在通常的场景中不起作用,因为当排队跟不上读取时,while循环会很快中断。这是重新加载的答案,它能够处理通常的交错馈送&读取场景:


不要依赖队列。空检查同步。

将对象放入空队列后,在队列的empty((方法返回False和get_nowait((可以在不引发队列的情况下返回之前,可能会有一个无限小的延迟。空的…

空((

如果队列为空,则返回True,否则返回False。由于多线程/多处理语义的原因,这是不可靠的。文档

从队列中使用for msg in iter(queue.get, sentinel):.get(),通过传递sentinel值来中断循环。。。iter(可调用,sentinel(?

from multiprocessing import Queue
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
for msg in iter(queue.get, SENTINEL):
print(msg)

或者如果需要非阻塞解决方案,则使用get_nowait()并处理可能的queue.Empty异常。

from multiprocessing import Queue
from queue import Empty
import time
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
while True:
try:
msg = queue.get_nowait()
if msg == SENTINEL:
break
print(msg)
except Empty:
# do other stuff
time.sleep(0.1)

如果只有一个进程和该进程中的一个线程正在读取队列,那么也可以用交换最后一个代码片段

while True:
if not queue.empty():  # this is not an atomic operation ...
msg = queue.get()  # ... thread could be interrupted in between
if msg == SENTINEL:
break
print(msg)
else:
# do other stuff
time.sleep(0.1)

由于线程可能会在检查if not queue.empty()queue.get()之间丢弃GIL,因此这不适用于进程中的多线程队列读取。如果有多个进程正在从队列中读取,则同样适用。

不过,对于单个生产者/单个消费者的场景,使用multiprocessing.Pipe而不是multiprocessing.Queue就足够了,而且性能更高。

您的代码确实可以工作,有些时候

这是因为队列不是立即不是空的。实现更多地涉及到支持多个进程之间的通信,因此线程和管道会导致empty状态持续的时间比代码允许的时间长一点。

请参阅管道和队列部分中的注释:

当一个对象被放在队列中时,该对象会被pickle,后台线程稍后会将pickle的数据刷新到底层管道。这会产生一些有点令人惊讶的后果,但不应该造成任何实际困难——如果它们真的困扰你,那么你可以使用由管理器创建的队列。

  1. 将对象放入空队列后,队列的empty()方法返回False[…]之前可能会有一个无穷小的延迟

(粗体强调矿(

如果你先添加一个循环来检查是否为空,那么你的代码就可以工作了:

queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while queue.empty():
print 'queue is still empty'
while not queue.empty():
print queue.get()

运行以上操作时,大多数情况下'queue is still empty'会出现一次。有时它根本不出现,有时会打印两次。

在使用get:之前检查queue

import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
if not queue.empty():
print queue.get()

相关内容

  • 没有找到相关文章

最新更新