下面的代码将三个数字放在一个队列中。然后它尝试从队列中取回数字。但从来没有。如何从队列中获取数据?
import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
print queue.get()
我最初在阅读@Martijn Pieters后删除了这个答案,因为他在更早的时候更详细地描述了"为什么这不起作用"。然后我意识到,OP示例中的用例不太符合的规范名称
"如何使用多访问.Queue.get方法"。
这不是因为没有涉及用于演示的子进程,但因为在实际应用程序中,几乎从来没有预先填充过队列,并且只在之后读取,但正在读取并且写入与其间的等待时间交错进行。Martijn展示的扩展演示代码在通常的场景中不起作用,因为当排队跟不上读取时,while循环会很快中断。这是重新加载的答案,它能够处理通常的交错馈送&读取场景:
不要依赖队列。空检查同步。
将对象放入空队列后,在队列的empty((方法返回False和get_nowait((可以在不引发队列的情况下返回之前,可能会有一个无限小的延迟。空的…
空((
如果队列为空,则返回True,否则返回False。由于多线程/多处理语义的原因,这是不可靠的。文档
从队列中使用for msg in iter(queue.get, sentinel):
到.get()
,通过传递sentinel值来中断循环。。。iter(可调用,sentinel(?
from multiprocessing import Queue
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
for msg in iter(queue.get, SENTINEL):
print(msg)
或者如果需要非阻塞解决方案,则使用get_nowait()
并处理可能的queue.Empty
异常。
from multiprocessing import Queue
from queue import Empty
import time
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
while True:
try:
msg = queue.get_nowait()
if msg == SENTINEL:
break
print(msg)
except Empty:
# do other stuff
time.sleep(0.1)
如果只有一个进程和该进程中的一个线程正在读取队列,那么也可以用交换最后一个代码片段
while True:
if not queue.empty(): # this is not an atomic operation ...
msg = queue.get() # ... thread could be interrupted in between
if msg == SENTINEL:
break
print(msg)
else:
# do other stuff
time.sleep(0.1)
由于线程可能会在检查if not queue.empty()
和queue.get()
之间丢弃GIL,因此这不适用于进程中的多线程队列读取。如果有多个进程正在从队列中读取,则同样适用。
不过,对于单个生产者/单个消费者的场景,使用multiprocessing.Pipe
而不是multiprocessing.Queue
就足够了,而且性能更高。
您的代码确实可以工作,有些时候。
这是因为队列不是立即不是空的。实现更多地涉及到支持多个进程之间的通信,因此线程和管道会导致empty
状态持续的时间比代码允许的时间长一点。
请参阅管道和队列部分中的注释:
当一个对象被放在队列中时,该对象会被pickle,后台线程稍后会将pickle的数据刷新到底层管道。这会产生一些有点令人惊讶的后果,但不应该造成任何实际困难——如果它们真的困扰你,那么你可以使用由管理器创建的队列。
- 将对象放入空队列后,队列的
empty()
方法返回False
[…]之前可能会有一个无穷小的延迟
(粗体强调矿(
如果你先添加一个循环来检查是否为空,那么你的代码就可以工作了:
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while queue.empty():
print 'queue is still empty'
while not queue.empty():
print queue.get()
运行以上操作时,大多数情况下'queue is still empty'
会出现一次。有时它根本不出现,有时会打印两次。
在使用get
:之前检查queue
import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
if not queue.empty():
print queue.get()