我一直在玩Python的asyncio
。我想我现在已经有了合理的理解。但以下行为让我感到困惑。
测试.py:
from threading import Thread
import asyncio
async def wait(t):
await asyncio.sleep(t)
print(f'waited {t} sec')
def run(loop):
loop.run_until_complete(wait(2))
loop = asyncio.get_event_loop()
t = Thread(target=run, args=(loop,))
t.start()
loop.run_until_complete(wait(1))
t.join()
这个代码是错误的我知道。事件循环在运行时无法运行,而且它通常不是线程安全的。
我的问题是:为什么wait(1)
有时仍然可以完成它的工作?
以下是连续两次运行的输出:
>>> py test.py
... Traceback (most recent call last):
... File "test.py", line 14, in <module>
... loop.run_until_complete(wait(1))
... File "C:PythonPython37libasynciobase_events.py", line 555, in run_until_complete
... self.run_forever()
... File "C:PythonPython37libasynciobase_events.py", line 510, in run_forever
...
... raise RuntimeError('This event loop is already running')
... RuntimeError: This event loop is already running
... waited 2 sec
>>> py test.py
... Traceback (most recent call last):
... File "test.py", line 14, in <module>
... loop.run_until_complete(wait(1))
... File "C:PythonPython37libasynciobase_events.py", line 555, in run_until_c
... omplete
... self.run_forever()
... File "C:PythonPython37libasynciobase_events.py", line 510, in run_forever
...
... raise RuntimeError('This event loop is already running')
... RuntimeError: This event loop is already running
... waited 1 sec
... waited 2 sec
第一次运行的行为是我所期望的——主线程失败,但事件循环仍然运行wait(2)
以在线程t
中完成。
第二次运行令人困惑,当RuntimeError
已经抛出时,wait(1)
怎么能完成它的工作?我想这与线程同步和事件循环的非线程安全性有关。但我不知道这到底是怎么回事。
哦。。。没关系。我读了asyncio
的代码并弄明白了。其实很简单。
run_until_complete
在检查self.is_running()
(在run_forever
中完成(之前调用ensure_future(future, loop=self)
。由于循环已经在运行,它可以在抛出RuntimeError
之前获取任务。当然,由于比赛条件的原因,这种情况并不总是发生。
每个线程都会抛出异常。运行时错误是在事件循环的另一个线程中引发的。不管怎样,事件循环都会继续执行。
wait(1)
有时可以完成它的工作,因为你可以运气好。asyncio
循环内部数据结构不受使用线程引起的竞争条件的影响(这就是为什么应该使用特定的线程支持方法(。但比赛条件的性质取决于事件的确切顺序,每次运行程序时,这种顺序都可能发生变化,这取决于操作系统当时正在做什么。
run_until_complete()
方法首先调用asyncio.ensure_task()
将协程添加到任务队列中,并附加一个将再次停止事件循环的"done"回调,然后调用loop.run_forever()
。当协程返回时,回调将停止循环。loop.run_forever()
调用在此处抛出RuntimeError
。
当您从线程执行此操作时,任务将被添加到附加到循环的deque对象中,如果这发生在正确的时刻(例如,当运行的循环不忙于清空队列时(,则主线程中的运行循环将找到并执行它,即使loop.run_forever()
调用引发异常。
所有这些都依赖于实现细节。不同版本的Python在这里可能会表现出不同的行为,如果你安装一个替代循环(例如uvloop
(,几乎肯定会再次出现不同的行为。
如果您想从不同的线程调度协同程序,请使用asyncio.run_coroutine_threadsafe()
;它会:
from threading import Thread
import asyncio
async def wait(t):
print(f'going to wait {t} seconds')
await asyncio.sleep(t)
print(f'waited {t} sec')
def run(loop):
asyncio.run_coroutine_threadsafe(wait(2), loop)
loop = asyncio.get_event_loop()
t = Thread(target=run, args=(loop,))
t.start()
loop.run_until_complete(wait(1))
t.join()
上面的内容实际上并没有完成wait(2)
协程,因为wait(1)
协程是与loop.run_until_complete()
一起运行的,所以它的回调在2秒等待结束之前再次停止循环。但协同程序实际上已经开始了:
going to wait 1 seconds
going to wait 2 seconds
waited 1 sec
但是,如果您让主线程协同程序花费更长的时间(比如wait(3)
(,那么从线程调度的协同程序也会完成。您必须做额外的工作,以确保在关闭循环之前,没有更多的挂起任务计划与循环一起运行。