在一系列异步任务中使用超时



希望在每个任务中嵌入超时,并为这些任务序列提供总体超时。

[编辑] 根据@Barak的输入简化代码

import asyncio
from random import uniform, seed
seed(1234)
async def fast(n):
s = uniform(1.5, 1.9)
print(f'fast work {n} starts and sleeps for {s} seconds')
await asyncio.sleep(s)
r = f'fast work {n} is completed!'
print(r)
return r
async def slow(n):
print(f'slow operation {n} starts and sleeps for 3 seconds')
await asyncio.sleep(3)
r = f'slow operation {n} is finished'
r = print(r)
return r
async def maketasks():
# collect the tasks
tasks = []
tasks.append([asyncio.create_task(fast(i)) for i in range(3)])
await slow(4) # this should immediately start!
tasks.append(asyncio.create_task(fast(4)))
return asyncio.gather(*tasks) # returns awaitables
async def dotasks(tasks, timeout):
try:
await asyncio.wait_for(tasks, timeout=timeout)
except asyncio.TimeoutError:
e_msg = f'Timed out after waiting {timeout} seconds in wait_for_complete'
print(e_msg)
return tasks, e_msg
return tasks
if __name__ == "__main__":
result = asyncio.run(dotasks(maketasks(), timeout=2))
print(result)

[预期输出]

  1. 对于超时<3,我希望 result 能为我提供一个由fast生成的字符串列表,并在任务中slow函数 result((.done(( 中。相反,我得到了一个coroutine对象,如下所示...
...
fast work 0 is completed!
Timed out after waiting 2 seconds in wait_for_complete
(<coroutine object maketasks at 0x05AA4568>, 'Timed out after waiting 2 seconds in wait_for_complete')
  1. 对于超时> 3,程序将失败并显示TypeError: unhashable type: 'list'错误,适用于:
if __name__ == "__main__":
result = asyncio.run(dotasks(maketasks(), timeout=4))
print(result)

。虽然它会打印出消息并完成所有任务,包括 fast(4(

  1. 感激知道我哪里出错了?

  2. 我应该怎么做才能从slowfast函数中获取字符串列表的结果,并遵守dotasks()中提供的超时?

如果要创建收集任务,则在完成其中的所有任务之前,该任务不会完成。我不确定像在 for 循环中那样迭代它是什么意思。

您可以通过在完成等待收集后检查其状态来从各个任务中提取结果:

async def wait_for_complete(*tasks, timeout=2):
# This code assumes that the inputs are tasks, not coroutines!
combined = asyncio.gather(*tasks)
try:
await asyncio.wait_for(combined, timeout=timeout)
return [t.result() for t in tasks]
except asyncio.TimeoutError:
return [t.result() if t.done() else 'Timeout'
for t in tasks]

最新更新