重新启动异步协程,而无需等待其他协程完成



我正在用Python练习异步编程,但有以下问题:

模拟多个人用设定数量的食物从同一个食物碗中进食。每个人一次可以吃x份食物,然后咀嚼食物y秒(模拟阻止呼叫)。只要碗里还有食物,一个人就可以独立于其他人拿走和咀嚼他们的食物。

为每个食客和食物碗定义类。最终目标是在食物碗类中有一个功能,它接受人员列表并让他们从碗开始进食,直到碗空了。每当有人从碗里拿食物时,都应该打印一条消息。

例如,如果我有一个食物碗,里面有 25 份食物,三个人,A、B 和 C:

  • A 一次服用 2 份食物,咀嚼 3 秒
  • B一次服用3份食物,咀嚼4秒
  • C一次服用5份食物,咀嚼2秒

因此,预期输出(打印到标准输出)应为:

(t=0) Person A takes 2 servings of food, leaving 23 servings in the bowl.
(t=0) Person B takes 3 servings of food, leaving 20 servings in the bowl.
(t=0) Person C takes 5 servings of food, leaving 15 servings in the bowl.
(t=2) Person C takes 5 servings of food, leaving 10 servings in the bowl.
(t=3) Person A takes 2 servings of food, leaving 8 servings in the bowl.
(t=4) Person B takes 3 servings of food, leaving 5 servings in the bowl.
(t=4) Person C takes 5 servings of food, leaving 0 servings in the bowl.
(t=4) The bowl is empty!

(在像t=4两个人准备再吃一份的时候,顺序并不重要) 代码是我的尝试:

import asyncio
import time

class Person():
def __init__(self, name, serving_size, time_to_eat):
self.name = name
self.serving_size = serving_size
self.time_to_eat = time_to_eat
async def eat_from(self, foodbowl):
servings_taken = self.serving_size if foodbowl.qty >= self.serving_size else foodbowl.qty
foodbowl.qty -= servings_taken
t = round(time.time() - foodbowl.start_time)
print("(t={}) Person {} picks up {} servings of food, leaving {} servings in the bowl.".format(t, self.name, servings_taken, foodbowl.qty))
await asyncio.sleep(self.time_to_eat)
return servings_taken

class FoodBowl():
def __init__(self, qty):
self.qty = qty
async def assign_eaters(self, eaters):
self.start_time = time.time()
while self.qty > 0:
await asyncio.gather(*[eater.eat_from(self) for eater in eaters])
t = round(time.time() - self.start_time)
print("The bowl is empty!")

bowl = FoodBowl(25)
person_1 = Person("A", 2, 3)
person_2 = Person("B", 3, 4)
person_3 = Person("C", 5, 2)
asyncio.run(bowl.assign_eaters([person_1, person_2, person_3]))

但是,我的尝试会导致以下行为:

(t=0) Person A picks up 2 servings of food, leaving 23 servings in the bowl.
(t=0) Person B picks up 3 servings of food, leaving 20 servings in the bowl.
(t=0) Person C picks up 5 servings of food, leaving 15 servings in the bowl.
(t=4) Person A picks up 2 servings of food, leaving 13 servings in the bowl.
(t=4) Person B picks up 3 servings of food, leaving 10 servings in the bowl.
(t=4) Person C picks up 5 servings of food, leaving 5 servings in the bowl.
(t=8) Person A picks up 2 servings of food, leaving 3 servings in the bowl.
(t=8) Person B picks up 3 servings of food, leaving 0 servings in the bowl.
(t=8) Person C picks up 0 servings of food, leaving 0 servings in the bowl.
The bowl is empty!

可以看出,每个人都等大家吃完了份,才再次伸手去拿碗。查看我的代码,我知道这是因为我在等待 eat 函数asyncio.gather(),因此它会等待所有三个人吃完饭,然后才能再次开始进食。

我知道这是错误的,但我不知道我可以在asyncio库中使用什么来解决这个问题。我正在考虑只要碗里还有食物,eat_from协程就会自动重新启动。我该如何实现此目的,或者是否有更好的方法来解决此问题?

我知道[等待所有三个人都吃完饭后才能再次开始进食]是错误的,但我不知道我可以在 asyncio 库中使用什么来解决这个问题。

您可以使用wait(return_when=asyncio.FIRST_COMPLETED)来等待任何进食者完成,而不是像当前代码那样等待所有进食者。每当一个进食者完成进食时,都会为同一个进食者生成一个新的协程,有效地"重新启动"它。这需要从wait返回给进食者的任务中引用;这样的引用可以很容易地附加到Task对象。代码可能如下所示:

async def assign_eaters(self, eaters):
self.start_time = time.time()
# create the initial tasks...
pending = [asyncio.create_task(eater.eat_from(self))
for eater in eaters]
# ...and store references to their respective eaters
for t, eater in zip(pending, eaters):
t.eater = eater
while True:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED)
if self.qty == 0:
break
for t in done:
# re-create the coroutines that have finished
new = asyncio.create_task(t.eater.eat_from(self))
new.eater = t.eater
pending.add(new)
t = round(time.time() - self.start_time)
print("The bowl is empty!")

这会产生预期的输出,但代价是有些复杂性。但是,如果你准备改变你的方法,有一个更简单的可能性:让每个吃东西的人都是一个独立的参与者,继续吃,直到碗里没有更多的食物。然后你不需要"重新启动"食客,仅仅因为他们一开始就不会退出,至少只要碗里有食物:

async def eat_from(self, foodbowl):
while foodbowl.qty:
servings_taken = self.serving_size 
if foodbowl.qty >= self.serving_size else foodbowl.qty
foodbowl.qty -= servings_taken
t = round(time.time() - foodbowl.start_time)
print("(t={}) Person {} picks up {} servings of food, "
"leaving {} servings in the bowl."
.format(t, self.name, servings_taken, foodbowl.qty))
await asyncio.sleep(self.time_to_eat)

assign_eaters不再需要循环并恢复为使用简单的gather

async def assign_eaters(self, eaters):
self.start_time = time.time()
await asyncio.gather(*[eater.eat_from(self) for eater in eaters])
t = round(time.time() - self.start_time)
print("The bowl is empty!")

这个更简单的代码再次产生预期的输出。唯一的"缺点"是这种变化需要反转控制:碗不再驱动进食过程,现在由每个进食者自主完成,碗被动地等待他们完成。然而,从问题陈述来看,这似乎不仅是可以接受的,甚至可能是抢手的解决方案。据说食物碗功能应该让人们"从碗开始吃,直到碗空了"。"开始吃"意味着碗只是启动这个过程,每个人都自己吃东西——这就是第二个版本的工作方式。

最新更新