在python多生产者和多消费者线程中,queue.join()可能不可靠吗?



python多制作人&多消费者线程伪代码:

def threadProducer():
    while upstreams_not_done:
        data = do_some_work()
        queue_of_data.put(data)
def threadConsumer():
    while True:
        data = queue_of_data.get()
        do_other_work()
        queue_of_data.task_done()
queue_of_data = queue.Queue()
list_of_producers = create_and_start_producers()
list_of_consumers = create_and_start_consumers()
queue_of_data.join()
# is now all work done?

对队列中的每个项目调用queue_of_data.task_done()

生产者的工作速度比消费者慢时,是否存在queue_of_data.join() 不阻塞的可能性,当没有生产者生成数据,但所有消费者都通过task_done()完成他们的任务 ?

如果Queue.join()不像这样可靠,我如何检查是否所有工作都完成了?

通常的方法是在队列上放置一个哨兵值(如None),当生产者完成时,每个消费者线程一个。然后,当线程从队列中取出None时,写入消费者以退出线程。

因此,例如,在主程序中:

for t in list_of_producers:
    t.join()
# Now we know all producers are done.
for t in list_of_consumers:
    queue_of_data.put(None)  # tell a consumer we're done
for t in list_of_consumers:
    t.join()

和消费者看起来像:

def threadConsumer():
    while True:
        data = queue_of_data.get()
        if data is None:
            break
        do_other_work()

注意:如果生产者可以压倒消费者,创建一个最大大小的队列。然后,当队列达到该大小时,queue.put()将阻塞,直到消费者从队列中删除某些内容。

最新更新