尽快执行asyncio任务



我想知道如何立即执行任务组'tg_fast',然后继续任务组'tg_main'(或重新开始,如果无法继续)。在使用asyncio.gather()时,结果类似于TaskGroup.

import asyncio

async def another_coro(i):
print(i)
await asyncio.sleep(.1)

async def coro(i):
if i == 1:
async with asyncio.TaskGroup() as tg_fast:
tg_fast.create_task(another_coro(i * 10))
tg_fast.create_task(another_coro(i * 100))
# await asyncio.gather(*[another_coro(i * 10), another_coro(i * 100)])
else:
print(i)
await asyncio.sleep(.1)

async def main():
async with asyncio.TaskGroup() as tg_main:
for i in range(0, 3):
tg_main.create_task(coro(i))

asyncio.run(main(), debug=True)

打印0 =>2 =比;10 =比;100

但我想一个方法来得到:0 =>10 =比;100 =比;…OR 0 =>100 =比;10 =比;…

目标是在0之后和2之前启动10和100。

非常感谢你的帮助。

编辑:我想同时调用'another_coro'。而不是等一个之后再开始第二个。我不需要完成它们,我可以执行它们直到await 'asyncio.sleep(.1')并继续事件循环。

要做到这一点,您必须故意添加另一种机制来确定任务的优先级,并且必须明确地对"非优先级"中的其他任务执行此操作。组。

例如,可以通过子类化asyncio来实现。TaskGroup,并向__aexit__方法添加优先级机制,以便当一个组打算退出时(并且它的所有任务都打算等待),如果有一个具有更高优先级的TaskGroup正在运行,它可以在中央注册表中检查您的专用TaskGroup的所有实例,然后等待直到该TaskGroup退出-

这将不需要改变任务中的任何代码-只是如何实例化您的组-但另一方面,它不会阻止非优先级任务在它们等待的代码中的任何其他点上执行步骤和运行部分(否则会导致asyncio循环)。

另一种方法,我写了下面的代码片段,要求您更改在点上具有较低优先级的任务,并在其中调用专门的sleep(它可以用"0"调用)。延迟,如asyncio.sleep)。放置这些调用的点将成为明确的点,您的任务将优先于应该首先运行的任务。

这允许更大的灵活性,更明确,并保证暂停较低优先级的工作-缺点是您必须显式地添加"检查点"。

可以理解,当有任何其他高优先级任务运行时,修改后的.sleep方法只是不返回。

import asyncio
from heapq import  heappush, heapify
granularity = 0.01

class PriorityGroups:
def __init__(self):
self.priority_queue = []
self.counter = 0
async def sleep(self, delay, priority=10):
counter = self.counter
self.counter += 1
steps = delay / granularity
step_delay = delay / steps
step = 0
heappush(self.priority_queue, (priority, counter))
try:
while step < steps or (self.priority_queue and self.priority_queue[0][0] < priority):
await asyncio.sleep(step_delay)
step += 1
finally:
self.priority_queue.remove((priority, counter))
heapify(self.priority_queue)

priority_group = PriorityGroups()

async def another_coro(i, priority=1):
await priority_group.sleep(.1, priority)
print(i)

async def coro(i):
if i == 1:
async with asyncio.TaskGroup() as tg_fast:
tg_fast.create_task(another_coro(i * 10))
tg_fast.create_task(another_coro(i * 100))
# await asyncio.gather(*[another_coro(i * 10), another_coro(i * 100)])
else:
await priority_group.sleep(.1)
print(i)

async def main():
async with asyncio.TaskGroup() as tg_main:
for i in range(0, 3):
tg_main.create_task(coro(i))

asyncio.run(main(), debug=True)

所以-只需对PriorityGroups.sleep的相同实例进行调用,可选地为优先级传递一个较低的数字(==更优先级),用于应该首先运行的事情。将控制放在PriorityGroups的实例中甚至意味着你可以并行嵌套任务组和优先级任务组,并且一个组不会干扰其他组。

最新更新