为什么异步事件循环有时即使遇到"RuntimeError"也能完成任务



我一直在玩Python的asyncio。我想我现在已经有了合理的理解。但以下行为让我感到困惑。

测试.py

from threading import Thread
import asyncio
async def wait(t):
await asyncio.sleep(t)
print(f'waited {t} sec')
def run(loop):
loop.run_until_complete(wait(2))
loop = asyncio.get_event_loop()
t = Thread(target=run, args=(loop,))
t.start()
loop.run_until_complete(wait(1))
t.join()

这个代码是错误的我知道。事件循环在运行时无法运行,而且它通常不是线程安全的。

我的问题是:为什么wait(1)有时仍然可以完成它的工作?

以下是连续两次运行的输出:

>>> py test.py
... Traceback (most recent call last):
...   File "test.py", line 14, in <module>
...     loop.run_until_complete(wait(1))
...   File "C:PythonPython37libasynciobase_events.py", line 555, in run_until_complete
...     self.run_forever()
...   File "C:PythonPython37libasynciobase_events.py", line 510, in run_forever
... 
...     raise RuntimeError('This event loop is already running')
... RuntimeError: This event loop is already running
... waited 2 sec
>>> py test.py
... Traceback (most recent call last):
...   File "test.py", line 14, in <module>
...     loop.run_until_complete(wait(1))
...   File "C:PythonPython37libasynciobase_events.py", line 555, in run_until_c
... omplete
...     self.run_forever()
...   File "C:PythonPython37libasynciobase_events.py", line 510, in run_forever
... 
...     raise RuntimeError('This event loop is already running')
... RuntimeError: This event loop is already running
... waited 1 sec
... waited 2 sec

第一次运行的行为是我所期望的——主线程失败,但事件循环仍然运行wait(2)以在线程t中完成。

第二次运行令人困惑,当RuntimeError已经抛出时,wait(1)怎么能完成它的工作?我想这与线程同步和事件循环的非线程安全性有关。但我不知道这到底是怎么回事。

哦。。。没关系。我读了asyncio的代码并弄明白了。其实很简单。

run_until_complete在检查self.is_running()(在run_forever中完成(之前调用ensure_future(future, loop=self)。由于循环已经在运行,它可以在抛出RuntimeError之前获取任务。当然,由于比赛条件的原因,这种情况并不总是发生。

每个线程都会抛出异常。运行时错误是在事件循环的另一个线程中引发的。不管怎样,事件循环都会继续执行。

wait(1)有时可以完成它的工作,因为你可以运气好。asyncio循环内部数据结构不受使用线程引起的竞争条件的影响(这就是为什么应该使用特定的线程支持方法(。但比赛条件的性质取决于事件的确切顺序,每次运行程序时,这种顺序都可能发生变化,这取决于操作系统当时正在做什么。

run_until_complete()方法首先调用asyncio.ensure_task()将协程添加到任务队列中,并附加一个将再次停止事件循环的"done"回调,然后调用loop.run_forever()。当协程返回时,回调将停止循环。loop.run_forever()调用在此处抛出RuntimeError

当您从线程执行此操作时,任务将被添加到附加到循环的deque对象中,如果这发生在正确的时刻(例如,当运行的循环不忙于清空队列时(,则主线程中的运行循环将找到并执行它,即使loop.run_forever()调用引发异常

所有这些都依赖于实现细节。不同版本的Python在这里可能会表现出不同的行为,如果你安装一个替代循环(例如uvloop(,几乎肯定会再次出现不同的行为。

如果您想从不同的线程调度协同程序,请使用asyncio.run_coroutine_threadsafe();它会:

from threading import Thread
import asyncio
async def wait(t):
print(f'going to wait {t} seconds')
await asyncio.sleep(t)
print(f'waited {t} sec')
def run(loop):
asyncio.run_coroutine_threadsafe(wait(2), loop)
loop = asyncio.get_event_loop()
t = Thread(target=run, args=(loop,))
t.start()
loop.run_until_complete(wait(1))
t.join()

上面的内容实际上并没有完成wait(2)协程,因为wait(1)协程是与loop.run_until_complete()一起运行的,所以它的回调在2秒等待结束之前再次停止循环。但协同程序实际上已经开始了:

going to wait 1 seconds
going to wait 2 seconds
waited 1 sec

但是,如果您让主线程协同程序花费更长的时间(比如wait(3)(,那么从线程调度的协同程序也会完成。您必须做额外的工作,以确保在关闭循环之前,没有更多的挂起任务计划与循环一起运行。

相关内容

最新更新