Asyncio,将任务添加到运行循环中



假设我们有12个任务,我们需要在一个条件下运行它们:我们只能有3个任务同时运行。所以我们可以在开始时只启动3个任务,然后直到其中一个任务完成并启动另一个任务。在下面的简单代码中,我使用Asyncio和semafore来实现这个目的。

import asyncio
import random
max_tasks = 12
sem = asyncio.Semaphore(3)

async def counter(n):
print(f'counter with argument {n} has been launched')
for i in range(n):
for j in range(n):
for k in range(n):
pass
await asyncio.sleep(1)
print(f'counter with argument {n} has FINISHED')

async def safe_calc(n):
async with sem:
await counter(n)

async def main():
tasks = [asyncio.ensure_future(safe_calc(random.randint(100, 600))) for _ in range(max_tasks)]
await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()

但是,如果我们有一个动态变量max_tasks,就像它是另一个函数或协程返回我们必须做的任务数量,在主循环运行期间,我们得到这个数字改变,从这一点上我们需要在循环中计算更多的任务?

你能解释一下这一行- "loop.run_until_complete(loop.shutdown_asyncgens())">

你需要围绕生产者-消费者的方式进行编码,这个问题有更多的解释,使用asyncio。队列为生产者-消费者流,但基本上你需要做一个队列,并有工人从它拉项目,在这个答案中,我创建了3个工人,并让他们从队列拉协程,直到队列为空。

import asyncio
import random
max_tasks = 12
tasks_concurrent = 3
async def counter(n):
print(f'counter with argument {n} has been launched')
for i in range(n):
for j in range(n):
for k in range(n):
pass
await asyncio.sleep(1)
print(f'counter with argument {n} has FINISHED')

async def safe_calc(n):
await counter(n)
return n
async def process_queue(queue:asyncio.Queue):
while True:
try:
task = queue.get_nowait()
await task
except asyncio.QueueEmpty:
return
async def main():
coroutines_to_do = asyncio.Queue()  # add tasks here , await tasks_to_do.put(task)
for _ in range(max_tasks):
await coroutines_to_do.put(safe_calc(random.randint(100, 600)))
workers = [asyncio.create_task(process_queue(coroutines_to_do)) for _ in range(tasks_concurrent)]
# put logic here to add extra things to tasks_to_do queue
return await asyncio.gather(*workers)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()

正如您自己指出的,如果您想要改变活动任务的数量,那么使用asyncio.Semaphore是错误的工具。我建议将所需的行为封装在一个小类中。

我想先说几句话。你说你想限制同时执行任务的数量,但我不认为这就是你的意思。我想不出你为什么要这么做。您的示例程序所做的是限制同时执行特定函数counter的任务数量。这不是一回事。这也是一个常见的要求;例如,您可能希望限制对服务器的并行请求的数量。

这段代码:

for i in range(n):
for j in range(n):
for k in range(n):
pass

…在asyncio程序中不是个好东西。它是没有任何await表达式的数字运算。因此,一旦一个任务进入这段代码,它将不允许任何其他任务运行,直到它完成。如果这种CPU密集型代码对于您的应用程序来说确实是一个很好的模型,那么您应该考虑使用多处理,以便让多个CPU内核发挥作用。作为示例的一部分,它使观察asyncio程序的结构变得困难。在我的代码中,我将其替换为一个变量asyncio。睡眠延迟。

这是一个做你想做的事情的类。您可以在构造函数中设置最大计数。同时进入函数calc的任务不超过max_count。条件和一个简单的计数器来限制并行任务的数量。

类还包含一个允许动态调整max_count的方法change_max。当您调用change_max时,它会向Condition对象发出信号,以防现在允许进行门控任务。

为了说明这是如何工作的,一个延迟函数发出对change_max的调用。我用asyncio.run启动事件循环。

import asyncio
import random
class Safe:
def __init__(self, max_count: int):
self.max_count = max_count
self.task_count = 0
self.ready = asyncio.Condition()

def change_max(self, new_max: int):
self.max_count = new_max
asyncio.create_task(self.check_count())
print("Max count is now", new_max)

async def calc(self, n: int):
async with self.ready:
await self.ready.wait_for(lambda: self.task_count < self.max_count)
try:
self.task_count += 1
await self.counter(n)
finally:
self.task_count -= 1
await self.check_count()

async def check_count(self):
async with self.ready:
self.ready.notify_all()
async def counter(self, n: int):
print(f'counter with argument {n} has been launched')
await asyncio.sleep(1.0 + n / 1000)
print(f'counter with argument {n} has FINISHED')

max_tasks = 12
async def main():
safe = Safe(3)
asyncio.get_event_loop().call_later(2.0, lambda: safe.change_max(4))
tasks = [asyncio.create_task(safe.calc(random.randint(500, 1500)))
for _ in range(max_tasks)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())