如何正确排队?为什么睡在消费者身上会让排队工作



我正在尝试实现一个队列。这是一段旧代码,它要么取自我不久前做的某种教程,要么取自我阅读文档时做的某种实验,或者两者的混合。问题是,我不确定代码是否是我的,但我正试图将其作为一个学习的例子。脚本中有一个生产者在列表中生成数字,两个消费者竞争获取这些数字并将其相加,金额最高的一个获胜。

所以,我的问题是:在下面的代码中;consumer_numbers";函数我有一个time.sleep(0.01(行,它使代码运行。没有它,代码就会挂起,有了它,代码就能顺利运行。有人能解释为什么会发生这种情况,以及我如何在没有这个问题的情况下实现队列吗?

import concurrent.futures
import time
import random
import threading
import queue

class MyQueue(queue.Queue):
def __init__(self, maxsize=10):
super().__init__()
self.maxsize = maxsize
self.numbers = []
def set_number(self, number):
self.put(number)
self.numbers.append(number)
def get_number(self):
return self.get()

def produce_random_numbers(q: MyQueue, maxcount: int, evnt: threading.Event):
count = 0
while not evnt.is_set():
num = random.randint(1, 5)
q.set_number(num)
count += 1
if count > maxcount:
event.set()

def consume_numbers(q: MyQueue, consumed: list, evnt: threading.Event):
while not q.empty() or not evnt.is_set():
num = q.get_number()
time.sleep(0.01)
consumed.append(num)

if __name__ == "__main__":
q = MyQueue(maxsize=10)
event = threading.Event()
cons1 = []
cons2 = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
ex.submit(produce_random_numbers, q, 50, event)
ex.submit(consume_numbers, q, cons1, event)
ex.submit(consume_numbers, q, cons2, event)
event.set()
print(f'Generated Numbers: {q.numbers}')
print(f'Numbers Consumed by Thread1 which summed up to {sum(cons1)} are: {cons1}')
print(f'Numbers Consumed by Thread2 which summed up to {sum(cons2)} are: {cons2}')
if sum(cons1) > sum(cons2):
print("Thread1 Wins!")
elif sum(cons1) < sum(cons2):
print("Thread2 Wins!")
else:
print("It's a tie!")

谢谢!

该代码不是从头开始实现队列,而是扩展queue.Queue以添加内存。有一个事件对象,用于向使用者发出生产者线程已完成的信号。当队列中只有一个项目时,消费者中存在隐藏的竞争条件。

如果队列中有内容或事件尚未设置,则检查not q.empty() or not evnt.is_set()将运行循环代码。可能发生的情况:

  1. 一个线程发现队列不是空的,并进入循环
  2. 发生线程切换,另一个线程消耗最后一个项目
  3. 第一个线程发生切换,它调用get_number()并阻止

evnt.is_set()检查也会出现类似的竞争情况:

  1. 最后一项由生产者添加到队列中,并发生线程切换
  2. 一个线程消耗最后一个项目,一个开关
  3. 线程切换发生,使用者获得最后一个项目并返回到循环条件。由于事件尚未设置,执行循环并阻止get_number()

让线程等待可以最大限度地减少出现这些情况的可能性。如果不等待,很可能单个使用者线程将消耗所有队列项目,而另一个线程仍在进入其循环。

使用超时很麻烦。避免使用事件的一个有用习惯用法是使用iter并使用一个不可能的值作为哨兵:

# --- snip ---
def produce_random_numbers(q: MyQueue, maxcount: int, n_consumers: int):
for _ in range(maxcount):
num = random.randint(1, 5)
q.set_number(num)
for _ in range(n_consumers):
q.put(None)  # <--- I use put to put one sentinel per consumer

def consume_numbers(q: MyQueue, consumed: list):
for num in iter(q.get_number, None):
consumed.append(num)

if __name__ == "__main__":
q = MyQueue(maxsize=10)
cons1 = []
cons2 = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
ex.submit(produce_random_numbers, q, 500000, 2)
ex.submit(consume_numbers, q, cons1)
ex.submit(consume_numbers, q, cons2)
print(f'Generated Numbers: {q.numbers}')
# --- snip ---

还有一些其他问题和事情我会采取不同的做法:

  • with...块之后的event.set()是无用的:事件已经由生产者设置
  • 生产者中存在拼写错误,使用了全局event变量而不是本地evnt参数。幸运的是,它们指的是同一个对象
  • 由于只有一个生产商,所以不会有任何问题。否则,MyQueue.numbers的顺序可能与项目添加到队列的顺序不同:
    1. put在一个线程上调用
    2. 发生线程切换
    3. 在新线程中发生put+append
    4. 发生线程切换,第一个值为appended
  • 与其定义MyQueue.set_number,我会高估put

最新更新