请看下面的代码(为了简单起见,我没有使用pydantic来对corutine、retries、timeouts进行分组(:
import asyncio
import typing as tp
import random
async def my_func(wait_time: int) -> str:
random_number = random.random()
random_time = wait_time - random_number if random.random() < 0.5 else wait_time + random_number
print(f"waiting for {wait_time}{random_time:+} seconds")
await asyncio.sleep(wait_time)
return f"waited for {wait_time}{random_time:+} seconds"
async def main() -> None:
task1 = asyncio.create_task(my_func(wait_time=1), name='task1')
task2 = asyncio.create_task(my_func(wait_time=2), name='task2')
task3 = asyncio.create_task(my_func(wait_time=3), name='task3')
task1_timeout = 1.2
task2_timeout = 2.2
task3_timeout = 3.2
task1_retry = 4
task2_retry = 3
task3_retry = 2
total_timeout = 5
<what to put here?>
return task1_result, task2_result, task3_result
asyncio.run(main())
正如你所看到的,我有函数my_func(在现实生活中,我会有多个不同的函数(。在main((中,我定义了3个任务。每个任务都有其超时和重试时间。例如,task1超时2秒,重试3次。
此外,我还有另一个(全局(超时total_timeout
,它表示main((必须完成的时间。
例如,如果task1
开始运行,但在1.2秒内没有得到结果,我们最多应该重试4次,所以在根本无法得到结果的情况下,我们仍然低于5秒的timeout_total
。
对于超时2.2秒并且可以重复3次的task2
,在4.4秒完成第二次重复之后,如果我们再次重试,它将在第5秒被total_timeout
截断。
对于task3
,如果我们在第一次尝试中没有完成,我们就没有足够的时间进行第二次尝试(total_timeout
(。
我希望同时执行所有三个任务,同时考虑它们各自的超时和重试,以及total_timeout
。最后,在长达5秒后,我将获得三个元素的元组,它们将是str(my_func的输出(或None(以防所有重复都失败,或者任务已被total_timeout
切断(。所以输出可以是(str, str, str)
、(str, None, str)
或(None, None, None)
。
有人能提供一些示例代码来实现我所描述的功能吗?
我认为这是一个很好的问题。我提出了这个解决方案,它结合了asyncio.gather((和asyncio.wait_forr((.
在这里,第三个任务被要求等待5秒,超时3.2秒[重试2次],并将返回None,因为异步。TimeoutError将被引发(并捕获(。
import asyncio
import random
import sys
total_timeout = float(sys.argv[1]) if len(sys.argv) > 1 else 5.0
async def work_coro(wait_time: int) -> str:
random_number = random.random()
random_time = wait_time - random_number if
random.random() < 0.5 else wait_time + random_number
if random_number > 0.7:
raise RuntimeError('Random sleep time too high')
print(f"waiting for {wait_time}{random_time:+} seconds")
await asyncio.sleep(random_time)
return f"waited for {wait_time}{random_time:+} seconds"
async def coro_trunner(wait_time: int,
retry: int,
timeout: float) -> str:
"""
Run work_coro in a controlled timing environment
:param int wait_time: How long the coroutine will sleep on each run
:param int retry: Retry count (if the coroutine times out, retry x times)
:param float timeout: Timeout for the coroutine
"""
for attempt in range(0, retry):
try:
start_time = loop.time()
print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
'spawning')
return await asyncio.wait_for(work_coro(wait_time),
timeout)
except asyncio.TimeoutError:
diff_time = loop.time() - start_time
print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
f'timeout (diff_time: {diff_time}')
continue
except asyncio.CancelledError:
print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
'cancelled')
break
except Exception as err:
# Unknown error raised in the worker: give it another chance
print(f'{work_coro}: ({wait_time}, {retry}, {timeout}): '
f'error in worker: {err}')
continue
async def main() -> list:
tasks = [
asyncio.create_task(coro_trunner(1, 2, 1.2)),
asyncio.create_task(coro_trunner(2, 3, 2.2)),
asyncio.create_task(coro_trunner(5, 5, 5.2))
]
try:
gaf = asyncio.gather(*tasks)
results = await asyncio.wait_for(gaf,
total_timeout)
except (asyncio.TimeoutError,
asyncio.CancelledError,
Exception):
# Total timeout reached: get the results that are ready
# Consume the gather exception
exc = gaf.exception() # noqa
results = []
for task in tasks:
if task.done():
results.append(task.result())
else:
# We want to know when a task yields nothing
results.append(None)
task.cancel()
return results
print(f'Total timeout: {total_timeout}')
loop = asyncio.get_event_loop()
start_time = loop.time()
results = asyncio.run(main())
end_time = loop.time()
print(f'{end_time - start_time} --> {results}')