Python 3.9:如何在非异步函数中正确等待锁定



TL;DR:Q:如何在py39中锁定保护共享资源不受同步功能的影响?

很抱歉,如果这个问题已经得到回答了——我就是找不到它。在访问共享资源时,我很难正确地等待锁定。我的API正在访问一个后端API,它需要花费很长时间登录,所以我缓存auth令牌,并在到期时更新:

async def _login():
global connection_lock
while True:
async with connection_lock:
# return a new connection or the one which has been cached in the meantime
# or raise whichever error received while attempting to login

class BackendSystem:
@property
def client(self):
if self._client is None or cache().get('headers') is None:
self._client = loop.run_until_complete(_login())
return self._client

现在的问题是,在某些情况下,对烧瓶的几个请求(可能是并行的(将并行地发送到烧瓶,从而导致此错误:

File "/usr/lib/python3.6/asyncio/base_events.py", line 471, in run_until_complete
self.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')

我确实找到了安装nest-asyncio的建议,但我不确定这是否是我真正需要的TBH。如果我理解正确,那么循环不是嵌套的(即从彼此内部运行,但我很可能错了(,而是我试图使用已经在运行的同一个循环——可能这部分是非法的?

我觉得不可思议的是,要从一个非异步的函数中对共享资源进行非常基本的锁定保护竟然如此困难,我也不想将其转换为异步。

我创建了一个可复制的示例(它帮助很大!(:

import asyncio

connection_lock = asyncio.Lock()
loop = asyncio.get_event_loop()

class Client:
def __init__(self, username: str):
self.username = username
def __str__(self):
return f"Client({self.username!r})"

async def _login(username) -> Client:
global connection_lock
while True:
print(f"{username} will lock")
async with connection_lock:
print(f"{username} got the lock")
await asyncio.sleep(5)  # sleep a bit
print(f"{username} has finished")
return Client(username)

class BackendSystem:
def __init__(self, username: str):
self._client = None
self.username = username
@property
def client(self):
if self._client is None:
self._client = loop.run_until_complete(_login(self.username))
return self._client

def main1():
def do_something(username: str):
print(BackendSystem(username).client)
for username in ["Steffen", "Lenormju"]:
do_something(username)

def main2():
async def do_something(username: str):
print(BackendSystem(username).client)
future = asyncio.gather(
do_something("Steffen"), do_something("Lenormju")
)
results = loop.run_until_complete(future)
return results

if __name__ == '__main__':
print("main1 ================")
main1()
print("main2 ================")
main2()

输出为:

main1 ================
Steffen will lock
Steffen got the lock
Steffen has finished
Client('Steffen')
Lenormju will lock
Lenormju got the lock
Lenormju has finished
Client('Lenormju')
main2 ================
Lenormju will lock
Lenormju got the lock
Steffen will lock
Traceback (most recent call last):
File "C:/PycharmProjects/stack_overflow/68159604.py", line 62, in <module>
main2()
File "C:/PycharmProjects/stack_overflow/68159604.py", line 54, in main2
results = loop.run_until_complete(future)
File "C:Program FilesPython3.6.8libasynciobase_events.py", line 484, in run_until_complete
return future.result()
File "C:/PycharmProjects/stack_overflow/68159604.py", line 49, in do_something
print(BackendSystem(username).client)
File "C:/PycharmProjects/stack_overflow/68159604.py", line 35, in client
self._client = loop.run_until_complete(_login(self.username))
File "C:Program FilesPython3.6.8libasynciobase_events.py", line 471, in run_until_complete
self.run_forever()
File "C:Program FilesPython3.6.8libasynciobase_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

当存在并发请求时,问题就会出现,main1显示顺序工作正常。它确认您在Flask运行代码时遇到它(已经在事件循环中(。

一个更简单的例子是:

import asyncio

loop = asyncio.get_event_loop()

if __name__ == '__main__':
async def lock_5_seconds():
await asyncio.sleep(5)
def do_synchronously_something_async():
loop.run_until_complete(lock_5_seconds())
async def do_something_async():
do_synchronously_something_async()
loop.run_until_complete(
asyncio.gather(
do_something_async(),
do_something_async()
)
)

查看asyncio.loop.run_until_complete文档:

运行,直到未来[…]完成。

如本答案所述:

事件循环的方法,如run_foreverrun_until_complete,通常只是启动事件循环的一种方法。

如果你从未停止过,你就不能给他们打几次电话。Flask为你启动了一次,你不应该自己再次启动。

但我想(重新(启动事件循环并不是你的意思
您的实际问题是希望同步调用异步函数。事实上,异步函数被安排在事件循环中运行。你不能直接打电话给他们。

但这个问题的答案告诉您只需调用loop.run_until_complete,这在您的情况下不起作用,因为您已经有一个事件循环在运行。

以下是关于Reddit上类似案例的讨论:从同步函数调用异步函数。还有一个来自使用FastAPI:Python的人的StackOverflow:当已经有事件循环运行时,从同步方法调用异步代码。

结论是有NO的方法可以准确地做你想要的事情
我认为您应该更改设计:对client属性的调用当前是同步的,尽管它必须调用异步(slowwww(代码(_login函数(。

你没有说你的代码的其余部分做什么,但如果它是一个远程API的包装器,我推荐这样的东西:

async def login():
...
class BackendSystem():
async def ensure_is_logged():
...
async def call_this_api_method():
await self.ensure_is_logged()
...

并采用异步代码。

或者只是不要让你的登录功能异步。把两者混合在一起会让人头疼。

最新更新