队列的正确实现.在 Python 中多线程时排队



我正在学习python,并编写了一些简单的脚本来给自己各种主题的实际示例。其中一个是这个脚本来演示如何排队。Queue() 可以与线程一起使用。Thread() 来创建后台工作线程。不过它的行为很奇怪。我进行了一些计时赛。只需一个线程,它就可以如您所期望的那样...完成 20 个任务大约需要 2 秒(实际上不到??)40 秒。对于四个线程,它再次如您所期望的那样执行。它一次完成 4 个任务,因此大约需要 10 秒。那么,当我运行 20 个线程时,地球需要 0.01 秒(1 平方英尺),---肯定必须花费 2 秒???

这是代码:

import threading
from queue import Queue
import time
q = Queue()
tLock = threading.Lock()
def run() :
    while True :
        task = q.get()  
        print('Just finished task number',task)
        q.task_done()   
        time.sleep(2)
def main() :
    # worker threads are activated  
    for x in range(20) :
        t = threading.Thread(target=run)
        t.daemon = True
        t.start()
    #20 jobs are put in the queue   
    for x in range(1,21) :
        q.put(x)
    #waits until queue is empty and then continues
    q.join()
if __name__ == '__main__' :
    startTime = time.time()
    main()
    print('Time taken was', time.time() - startTime)

你实际上并没有阻止主线程的进度:

"正确的"(*) 方法是通过连接所有线程来确保所有线程都已完成:

def main() :
    # keep reference to threads
    threads = [threading.Thread(target=run) for _ in range(20)]
    # start all threads
    for t in threads:
        t.start()
    #20 jobs are put in the queue   
    for x in range(1,21) :
        q.put(x)
    #waits until queue is empty and then continues
    q.join()
    # join all threads
    for t in threads:
        t.join()

*但是,这不起作用,因为您的线程处于无限循环中,甚至任务也已完成。

因此,另一种方法是确保在报告任务之前等待:

def run() :
    while True :
        task = q.get()  
        # simulate processing time *before* actual reporting
        time.sleep(2)
        print('Just finished task number',task)  
        q.task_done()

尽管如此,线程仍然被阻塞。你得到的是给线程的消息,告诉他们退出。像这样:

def run() :
    while True :
        task = q.get()
        if task == 'stop':
            break  
        # simulate processing time *before* actual reporting
        time.sleep(2)
        print('Just finished task number',task)  
        q.task_done()

现在只需告诉主线程放置足够的停止消息,以便所有线程最终退出其无限循环:

def main() :
    # keep reference to threads
    threads = [threading.Thread(target=run) for _ in range(20)]
    # start all threads
    for t in threads:
        t.start()
    #20 jobs are put in the queue   
    for x in range(1,21):
        q.put(x)
    for x in range(20):
        # stop all threads after tasks are done
        q.put('stop')
    # waits until queue is empty and then continues
    q.join()
    # join all threads
    for t in threads:
        t.join()

提示:您不应使用"幻数",例如 20 。在模块级别中具有一个名为 THREADS_COUNT 的全局变量,因此当您想要测试不同的配置时,您只需更改一个位置。

最新更新