为什么 asyncio 子进程在创建的事件循环中的行为不同,除非您set_event_loop



我有一个简单的异步代码,它会生成sleep 3然后等待它完成:

from asyncio import SelectorEventLoop, create_subprocess_exec, 
    wait_for, get_event_loop, set_event_loop

def run_timeout(loop, awaitable, timeout):
    timed_awaitable = wait_for(awaitable, timeout=timeout, loop=loop)
    return loop.run_until_complete(timed_awaitable)
async def foo(loop):
    process = await create_subprocess_exec('sleep', '3', loop=loop)
    await process.wait()
    print(process.returncode)

请注意它是如何采用自定义loop的。如果我用以下内容运行它:

loop = get_event_loop()
run_timeout(loop, foo(loop), 5)
loop.close()

它按预期工作(3 秒后sleep 3成功完成并打印0(。但是,如果我使用自己的事件循环运行它:

loop = SelectorEventLoop()
run_timeout(loop, foo(loop), 5)
loop.close()

我得到一个TimeoutError(来自run_timeoutwait_for(:

Traceback (most recent call last):
  File "test.py", line 15, in <module>
    _run_async(loop, foo(loop), 5)
  File "test.py", line 7, in _run_async
    return loop.run_until_complete(timed_coroutine)
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/usr/lib/python3.5/asyncio/tasks.py", line 396, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

我可以让我的自定义事件循环工作的唯一方法是如果我在创建自己的SelectorEventLoopset_event_loop()

loop = SelectorEventLoop()
set_event_loop(loop)
run_timeout(loop, foo(loop), 3)
loop.close()

这里给了什么?我误解了文档吗?是否必须将所有事件循环(您使用的(都设为默认循环?如果是这样,允许将自定义loop传递到许多异步方法(例如。 create_subprocess_execwait_for (,因为唯一可以传入的值是 get_event_loop() ,这是默认值。

这真的很奇怪。我调试了程序,发现很难说它是否是一个错误。

长话短说,在执行create_subprocess_exec时,您不仅需要一个事件循环,还需要一个子观察程序(用于监视子进程(。但是create_subprocess_exec没有提供一种让您设置自定义子观察器的方法,它只是使用附加到默认事件循环而不是当前正在运行的事件循环的默认观察器。

如果使用以下代码,它将起作用:

from asyncio import SelectorEventLoop, create_subprocess_exec, 
    wait_for, get_event_loop, set_event_loop, get_child_watcher

def run_timeout(loop, awaitable, timeout):
    timed_awaitable = wait_for(awaitable, timeout=timeout)
    return loop.run_until_complete(timed_awaitable)
async def foo():
    process = await create_subprocess_exec('sleep', '3')
    await process.wait()
    print(process.returncode)
loop = SelectorEventLoop()
# core line, get default child watcher and attach it to your custom loop.
get_child_watcher().attach_loop(loop)
run_timeout(loop, foo(), 5)
loop.close()

如果使用set_event_loop设置默认循环,它还会将默认子观察程序重新附加到新的默认循环。这就是它起作用的原因。


很难说这是一个错误还是关于 API 设计的问题。create_subprocess_exec是否应该让您通过自定义观察程序?如果应该,它会产生混乱,因为您只会在使用子进程时触摸子观察程序。

相关内容

最新更新