使用asyncio.run,多次运行是否安全



asyncio.run的文档状态:

此函数总是创建一个新的事件循环,并在结束时关闭它。它应该被用作异步程序的主要入口点,并且应该理想情况下只能调用一次。

但没有说明原因。我有一个非异步程序,它需要调用一些异步的东西。每次到达异步部分时,我可以只使用asyncio.run吗?还是这不安全/错误?

在我的例子中,我有几个async协程,我想gather并并行运行以完成。当它们全部完成后,我想继续使用我的同步代码。

async my_task(url):
# request some urls or whatever
integration_tasks = [my_task(url1), my_task(url2)]
async def gather_tasks(*integration_tasks):
return await asyncio.gather(*integration_tasks)

def complete_integrations(*integration_tasks):
return asyncio.run(gather_tasks(*integration_tasks))
print(complete_integrations(*integration_tasks))

我可以使用asyncio.run()多次运行协同程序吗

这实际上是一个有趣且非常重要的问题。

正如asyncio(python3.9)的文档所说:

此函数总是创建一个新的事件循环,并在结束时关闭它。它应该用作异步程序的主要入口点,并且理想情况下只能调用一次。

它不禁止多次调用它。此外,还有一种从同步代码调用协程的旧方法,它是:

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

现在由于get_event_loop()方法而被弃用,文档中写道:

还可以考虑使用asyncio.run()函数,而不是使用较低级别的函数手动创建和关闭事件循环。

自3.10版起已弃用:如果没有正在运行的事件循环,则会发出弃用警告。在未来的Python版本中,此函数将是get_running_roop()的别名。

因此,在未来的版本中,如果已经运行的事件循环不存在,它将不会产生新的事件循环!如果您想在没有新循环的情况下自动生成新循环,文档建议使用asyncio.run()

做出这样的决定是有充分理由的。即使你有一个事件循环,并且你将成功地使用它来执行协同程序,你也必须记住:

  • 关闭事件循环
  • 消耗未消耗的生成器(在协同程序失败的情况下最重要)
  • 。。。可能更多,我甚至不想在这里提及

正确完成事件循环到底需要做什么您可以阅读此源代码。

手动管理事件循环(如果没有运行)是一个微妙的过程,最好不要这样做,除非知道自己在做什么

所以,是的,我认为从同步代码运行异步函数的正确方法是调用asyncio.run()。但它只适用于完全同步的应用程序。如果已经有正在运行的事件循环,它可能会失败(未测试)。在这种情况下,只需await即可,或者使用get_runing_loop().run_untilcomplete(coro)

对于这样的同步应用程序,使用asyncio.run()是一种安全的方式,实际上也是唯一安全的方式,并且它可以多次调用。

文档说您应该只调用它一次的原因是,通常整个异步应用程序只有一个入口点。它简化了事情,实际上提高了性能,因为为事件循环设置精简也需要一些时间。但是,如果您的应用程序中没有可用的单个循环,则应该使用对asyncio.run()的多个调用来多次运行协同程序。

是否有任何性能提升

除了讨论多次致电asyncio.run()之外,我还想解决另一个问题。在评论中,@jwal说:

asyncio不是并行处理。文件上是这么说的。[…]如果您想要并行,请在具有单独CPU核心的计算机上的单独进程中运行,而不是在单独线程中运行,也不是在单独事件循环中运行。

暗示异步不适合并行处理,这可能会被误解和误导,得出这样的结论,即它不会导致性能提升,这并不总是正确的。而且它通常是假的!

因此,任何时候你都可以将作业委托给外部进程(不仅是python进程,它还可以是数据库工作进程、http调用,理想情况下是任何TCP套接字调用)。你可以使用asyncio来提高性能。在绝大多数情况下,当您使用一个公开异步接口的库时,该库的作者会努力最终等待网络/套接字/进程调用的结果。当此类套接字的响应尚未就绪时,事件循环可以完全自由地执行任何其他任务。如果循环有多个这样的任务,它将获得性能

这种情况的一个典型例子是调用HTTP端点。在某个时刻,进行网络调用,因此python线程可以在等待数据出现在TCP套接字缓冲区时自由地执行其他工作。我有一个例子!

该示例使用httpx库来比较对OpenWeatherMap API进行多次调用的性能。有两个功能:

  • get_weather_async()
  • get_weather_sync()

第一个对http API执行8个请求,但将这些请求调度到使用asyncio.gather()在事件循环上协同运行(不是并发!)。

第二个按顺序执行8个同步请求。

为了调用异步函数,我实际上使用了asyncio.run()方法。此外,我使用timeit模块对asyncio.run()执行了4次这样的调用。因此,在一个python应用程序中,asyncio.run()被调用了4次,只是为了挑战我之前的考虑。

from time import time
import httpx
import asyncio
import timeit
from random import uniform

class AsyncWeatherApi:
def __init__(
self, base_url: str = "https://api.openweathermap.org/data/2.5"
) -> None:
self.client: httpx.AsyncClient = httpx.AsyncClient(base_url=base_url)
async def weather(self, lat: float, lon: float, app_id: str) -> dict:
response = await self.client.get(
"/weather",
params={
"lat": lat,
"lon": lon,
"appid": app_id,
"units": "metric",
},
)
response.raise_for_status()
return response.json()

class SyncWeatherApi:
def __init__(
self, base_url: str = "https://api.openweathermap.org/data/2.5"
) -> None:
self.client: httpx.Client = httpx.Client(base_url=base_url)
def weather(self, lat: float, lon: float, app_id: str) -> dict:
response = self.client.get(
"/weather",
params={
"lat": lat,
"lon": lon,
"appid": app_id,
"units": "metric",
},
)
response.raise_for_status()
return response.json()

def get_random_locations() -> list[tuple[float, float]]:
"""generate 8 random locations in +/-europe"""
return [(uniform(45.6, 52.3), uniform(-2.3, 29.4)) for _ in range(8)]

async def get_weather_async(locations: list[tuple[float, float]]):
api = AsyncWeatherApi()
return await asyncio.gather(
*[api.weather(lat, lon, api_key) for lat, lon in locations]
)

def get_weather_sync(locations: list[tuple[float, float]]):
api = SyncWeatherApi()
return [api.weather(lat, lon, api_key) for lat, lon in locations]

api_key = "secret"

def time_async_job(repeat: int = 1):
locations = get_random_locations()
def run():
return asyncio.run(get_weather_async(locations))
duration = timeit.Timer(run).timeit(repeat)
print(
f"[ASYNC] In {duration}s: done {len(locations)} API calls, all"
f" repeated {repeat} times"
)

def time_sync_job(repeat: int = 1):
locations = get_random_locations()
def run():
return get_weather_sync(locations)
duration = timeit.Timer(run).timeit(repeat)
print(
f"[SYNC] In {duration}s: done {len(locations)} API calls, all repeated"
f" {repeat} times"
)

if __name__ == "__main__":
time_sync_job(4)
time_async_job(4)

最后,打印了性能比较。上面写着:

[SYNC] In 5.5580058859995916s: done 8 API calls, all repeated 4 times
[ASYNC] In 2.865574334995472s: done 8 API calls, all repeated 4 times

这4次重复只是为了表明你可以安全地多次运行asyncio.run()。它实际上对异步http调用的性能测量产生了破坏性的影响,因为所有32个请求实际上都是在8个异步任务的四个同步批中运行的。只是为了比较一批32个请求的性能:

[SYNC] In 4.373898585996358s: done 32 API calls, all repeated 1 times
[ASYNC] In 1.5169846520002466s: done 32 API calls, all repeated 1 times

所以,是的,若只使用适当的异步库,它可以而且通常会导致性能提高(若库公开了异步API,它通常是故意这样做的,因为它知道某个地方会有网络调用)。

最新更新