在gather(*tasks)之后动态添加新任务



例如:

async def f1(num):
while True:
print(num)
await asyncio.sleep(2)
class ExampleClass:
def __init__():
self.tasks = []
async def main():
for i in range(10):
tasks.append(asyncio.create_task(f1(i)))
await asyncio.gather(*tasks)
def add_new_task(task):
self.tasks.append(task)

然后在外面某处我呼唤

ExampleClass.add_new_task(task)

我需要的是添加新的任务,并与现有的任务异步执行。

也许我应该使用任何其他结构来实现我想要的?重要的是,我的任务可能需要永远执行(永远轮询)

asyncio.gather函数对于这样的任务并不是很方便。然而,你可以看看asyncio.TaskGroup,它在Python3.11中发布,允许更容易地动态添加新任务。

import asyncio
from concurrent.futures import wait, FIRST_COMPLETED

async def main():
async with asyncio.TaskGroup() as group:
# Create some tasks
tasks = [
group.create_task(asyncio.sleep(1.0))
for _ in range(10)
]

# Wait for some tasks to finish
done, tasks = await wait(tasks, return_when=FIRST_COMPLETED)
# Add more tasks (/! `tasks` is now a set)
tasks.add(group.create_task(asyncio.sleep(2.0)))

# Wait the rest to complete 
await asyncio.gather(*tasks)

if __name__ == "__main__":
asyncio.run(main())

所以,我想到了一个可能的解决方案,那就是路易斯。近似的想法是使用假异步有限循环,其中执行所有任务,并在asyncio中更新新任务。TaskGroup:

async def fake_generator():
while True:
yield None
fake_gen = fake_generator()
async with asyncio.TaskGroup() as tg:
async for _ in fake_gen:
await add_new_data()
for element in self.data:
if not self.data[element]['in_use']:
tg.create_task(job(element))
self.data[element]['in_use'] = True
await asyncio.sleep(60 * 60)

最新更新