带有自定义超时实现的协同程序



我正在使用asyncio运行一个子流程,该流程打印一些输出,我逐行解析这些输出,并根据我看到的输出做各种事情。我想给这个进程设置一个超时,但它不应该是整个进程生命周期的全局超时。相反,每当我看到进程的某些特定输出时,我都希望实际重置超时,以便重新开始。我该如何实现?

对于全局超时,我可以这样做,而且很简单,我只需调用asyncio.wait_for(_foo(), timeout)。但我无法通过重置超时来实现这一点。到目前为止,我拥有的是:

# here invocation is my own data structure with some bookkeeping information in it
# (such as the start time from which I want to base my timeout decisions on).
# and process is the value returned by await asyncio.create_subprocess_exec(...)
# _run_one_invocation() is my own function which is just a stdout readline loop
# and some dispatching.
# Make a Task out of the co-routine so that we decide when it gets cancelled, not Python.
run_task = asyncio.Task(_run_one_invocation(invocation, process))
while True:
try:
# Use asyncio.shield so it doesn't get cancelled if the timeout expires
await asyncio.shield(asyncio.wait_for(run_task, 1))
# If the await returns without raising an exception, we got to EOF and we're done.
break
except asyncio.TimeoutError:
# If it's been too long since last reset, this is a "real" timeout.
duration = time.time() - invocation.start_time
if duration > timeout:
run_task.cancel()
raise

当我运行这个时,调用run_task.cancel()的if语句是而不是输入的,但是当我回到循环的顶部并再次调用asyncio.wait_for()时,它会立即引发一个asyncio.CancelledError

我做错了什么?

您可以通过完全避免wait_for()(因此也避免了shield()(并仅使用wait(return_when=FIRST_COMPLETED)来实现所需的超时来解决问题并简化代码:

run_task = asyncio.create_task(_run_one_invocation(invocation, process))
while True:
await asyncio.wait([run_task], timeout=1)
if run_task.done():
break
if time.time() - invocation.start_time > timeout:
run_task.cancel()
raise asyncio.TimeoutErrror()

这种方法的缺点是,它引入了1s唤醒,禁止程序(以及计算机(进入睡眠状态,即使任务休眠数小时。在服务器上可能不是什么大不了的事,但这种做法会导致笔记本电脑的电池消耗,最好避免这种情况。此外,1s睡眠引入了高达1s的延迟,以对超时的变化做出反应。

为了解决这个问题,您可以创建一个由更改超时的代码触发的事件,并对该事件以及超时和任务完成做出反应:

timeout_changed = asyncio.Event()
# pass timeout_changed where needed, and have the code that changes
# the timeout also call timeout_changed.set()
run_task = asyncio.create_task(_run_one_invocation(invocation, process))
while True:
remaining = timeout - (time.time() - invocation.start_time)
timeout_changed_task = asyncio.ensure_future(timeout_changed.wait())
await asyncio.wait([run_task, timeout_changed_task],
return_when=asyncio.FIRST_COMPLETED, timeout=remaining)
timeout_changed_task.cancel()
# either: 1) the task has completed, 2) the previous timeout has
# expired, or 3) the timeout has changed
if run_task.done():
break  # 1
if time.time() - invocation.start_time > timeout:
# 2 or 2+3
run_task.cancel()
raise asyncio.TimeoutErrror()
# 3 - continue waiting with the new timeout

最新更新