如果使用锁定,为什么 aiohttp 请求卡住?



为什么这段代码:

import asyncio
import time
from multiprocessing import Pool, Manager
from threading import Thread, Lock
from aiohttp import ClientSession

async def test(s: ClientSession, lock: Lock, identifier):
print(f'before acquiring {identifier}')
lock.acquire()
print(f'before request {identifier}')
async with s.get('http://icanhazip.com') as r:
print(f'after request {identifier}')
lock.release()
print(f'after releasing {identifier}')

async def main(lock: Lock):
async with ClientSession() as s:
await asyncio.gather(test(s, lock, 1), test(s, lock, 2))

def run(lock: Lock):
asyncio.run(main(lock))

if __name__ == '__main__':
# Thread(target=run, args=[Lock()]).start()
with Pool(processes=1) as pool:
pool.map(run, [Manager().Lock()])

指纹:

before acquiring 1
before request 1
before acquiring 2

然后卡住了?为什么标识符为 1 的请求未执行?与线程相同(已评论)尝试处理请求,工作。

发生这种情况是因为您将同步锁(阻塞整个执行线程)与asyncio混合在一起,后者要求所有操作都是非阻塞的。您的两个协程(对test的两个调用)都在同一个线程中运行,因此当第二个协程尝试获取锁但被阻止时,它也会阻止第一个协程(持有锁)进行任何其他进展。

您可以改用asyncio.Lock来解决此问题。它只会阻塞等待锁的协程,而不是阻塞整个线程。请注意,此锁不能在进程之间传递,因此除非您停止使用multiprocessing,否则它将无法工作,这在上面的示例代码中实际上不是必需的。您只需创建一个仅在单个子进程中使用的锁,因此您只需在子进程中创建asyncio.Lock,而不会丢失任何功能。

但是,如果您的实际用例需要一个asyncio友好的锁,该锁也可以在进程之间共享,您可以使用aioprocessing来实现这一点(完全披露:我是aioprocessing的作者)。

最新更新