为什么这段代码:
import asyncio
import time
from multiprocessing import Pool, Manager
from threading import Thread, Lock
from aiohttp import ClientSession
async def test(s: ClientSession, lock: Lock, identifier):
print(f'before acquiring {identifier}')
lock.acquire()
print(f'before request {identifier}')
async with s.get('http://icanhazip.com') as r:
print(f'after request {identifier}')
lock.release()
print(f'after releasing {identifier}')
async def main(lock: Lock):
async with ClientSession() as s:
await asyncio.gather(test(s, lock, 1), test(s, lock, 2))
def run(lock: Lock):
asyncio.run(main(lock))
if __name__ == '__main__':
# Thread(target=run, args=[Lock()]).start()
with Pool(processes=1) as pool:
pool.map(run, [Manager().Lock()])
指纹:
before acquiring 1
before request 1
before acquiring 2
然后卡住了?为什么标识符为 1 的请求未执行?与线程相同(已评论)尝试处理请求,工作。
发生这种情况是因为您将同步锁(阻塞整个执行线程)与asyncio
混合在一起,后者要求所有操作都是非阻塞的。您的两个协程(对test
的两个调用)都在同一个线程中运行,因此当第二个协程尝试获取锁但被阻止时,它也会阻止第一个协程(持有锁)进行任何其他进展。
您可以改用asyncio.Lock
来解决此问题。它只会阻塞等待锁的协程,而不是阻塞整个线程。请注意,此锁不能在进程之间传递,因此除非您停止使用multiprocessing
,否则它将无法工作,这在上面的示例代码中实际上不是必需的。您只需创建一个仅在单个子进程中使用的锁,因此您只需在子进程中创建asyncio.Lock
,而不会丢失任何功能。
但是,如果您的实际用例需要一个asyncio
友好的锁,该锁也可以在进程之间共享,您可以使用aioprocessing
来实现这一点(完全披露:我是aioprocessing
的作者)。