我有一个简单的异步代码,它会生成sleep 3
然后等待它完成:
from asyncio import SelectorEventLoop, create_subprocess_exec,
wait_for, get_event_loop, set_event_loop
def run_timeout(loop, awaitable, timeout):
timed_awaitable = wait_for(awaitable, timeout=timeout, loop=loop)
return loop.run_until_complete(timed_awaitable)
async def foo(loop):
process = await create_subprocess_exec('sleep', '3', loop=loop)
await process.wait()
print(process.returncode)
请注意它是如何采用自定义loop
的。如果我用以下内容运行它:
loop = get_event_loop()
run_timeout(loop, foo(loop), 5)
loop.close()
它按预期工作(3 秒后sleep 3
成功完成并打印0
(。但是,如果我使用自己的事件循环运行它:
loop = SelectorEventLoop()
run_timeout(loop, foo(loop), 5)
loop.close()
我得到一个TimeoutError
(来自run_timeout
的wait_for
(:
Traceback (most recent call last):
File "test.py", line 15, in <module>
_run_async(loop, foo(loop), 5)
File "test.py", line 7, in _run_async
return loop.run_until_complete(timed_coroutine)
File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/usr/lib/python3.5/asyncio/tasks.py", line 396, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
我可以让我的自定义事件循环工作的唯一方法是如果我在创建自己的SelectorEventLoop
后set_event_loop()
:
loop = SelectorEventLoop()
set_event_loop(loop)
run_timeout(loop, foo(loop), 3)
loop.close()
这里给了什么?我误解了文档吗?是否必须将所有事件循环(您使用的(都设为默认循环?如果是这样,允许将自定义loop
传递到许多异步方法(例如。 create_subprocess_exec
和 wait_for
(,因为唯一可以传入的值是 get_event_loop()
,这是默认值。
这真的很奇怪。我调试了程序,发现很难说它是否是一个错误。
长话短说,在执行create_subprocess_exec
时,您不仅需要一个事件循环,还需要一个子观察程序(用于监视子进程(。但是create_subprocess_exec
没有提供一种让您设置自定义子观察器的方法,它只是使用附加到默认事件循环而不是当前正在运行的事件循环的默认观察器。
如果使用以下代码,它将起作用:
from asyncio import SelectorEventLoop, create_subprocess_exec,
wait_for, get_event_loop, set_event_loop, get_child_watcher
def run_timeout(loop, awaitable, timeout):
timed_awaitable = wait_for(awaitable, timeout=timeout)
return loop.run_until_complete(timed_awaitable)
async def foo():
process = await create_subprocess_exec('sleep', '3')
await process.wait()
print(process.returncode)
loop = SelectorEventLoop()
# core line, get default child watcher and attach it to your custom loop.
get_child_watcher().attach_loop(loop)
run_timeout(loop, foo(), 5)
loop.close()
如果使用set_event_loop
设置默认循环,它还会将默认子观察程序重新附加到新的默认循环。这就是它起作用的原因。
很难说这是一个错误还是关于 API 设计的问题。create_subprocess_exec
是否应该让您通过自定义观察程序?如果应该,它会产生混乱,因为您只会在使用子进程时触摸子观察程序。