asyncio.run的文档状态:
此函数总是创建一个新的事件循环,并在结束时关闭它。它应该被用作异步程序的主要入口点,并且应该理想情况下只能调用一次。
但没有说明原因。我有一个非异步程序,它需要调用一些异步的东西。每次到达异步部分时,我可以只使用asyncio.run
吗?还是这不安全/错误?
在我的例子中,我有几个async
协程,我想gather
并并行运行以完成。当它们全部完成后,我想继续使用我的同步代码。
async my_task(url):
# request some urls or whatever
integration_tasks = [my_task(url1), my_task(url2)]
async def gather_tasks(*integration_tasks):
return await asyncio.gather(*integration_tasks)
def complete_integrations(*integration_tasks):
return asyncio.run(gather_tasks(*integration_tasks))
print(complete_integrations(*integration_tasks))
我可以使用asyncio.run()
多次运行协同程序吗
这实际上是一个有趣且非常重要的问题。
正如asyncio(python3.9)的文档所说:
此函数总是创建一个新的事件循环,并在结束时关闭它。它应该用作异步程序的主要入口点,并且理想情况下只能调用一次。
它不禁止多次调用它。此外,还有一种从同步代码调用协程的旧方法,它是:
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
现在由于get_event_loop()
方法而被弃用,文档中写道:
还可以考虑使用asyncio.run()函数,而不是使用较低级别的函数手动创建和关闭事件循环。
自3.10版起已弃用:如果没有正在运行的事件循环,则会发出弃用警告。在未来的Python版本中,此函数将是get_running_roop()的别名。
因此,在未来的版本中,如果已经运行的事件循环不存在,它将不会产生新的事件循环!如果您想在没有新循环的情况下自动生成新循环,文档建议使用asyncio.run()
。
做出这样的决定是有充分理由的。即使你有一个事件循环,并且你将成功地使用它来执行协同程序,你也必须记住:
- 关闭事件循环
- 消耗未消耗的生成器(在协同程序失败的情况下最重要)
- 。。。可能更多,我甚至不想在这里提及
正确完成事件循环到底需要做什么您可以阅读此源代码。
手动管理事件循环(如果没有运行)是一个微妙的过程,最好不要这样做,除非知道自己在做什么。
所以,是的,我认为从同步代码运行异步函数的正确方法是调用asyncio.run()
。但它只适用于完全同步的应用程序。如果已经有正在运行的事件循环,它可能会失败(未测试)。在这种情况下,只需await
即可,或者使用get_runing_loop().run_untilcomplete(coro)
。
对于这样的同步应用程序,使用asyncio.run()
是一种安全的方式,实际上也是唯一安全的方式,并且它可以多次调用。
文档说您应该只调用它一次的原因是,通常整个异步应用程序只有一个入口点。它简化了事情,实际上提高了性能,因为为事件循环设置精简也需要一些时间。但是,如果您的应用程序中没有可用的单个循环,则应该使用对asyncio.run()
的多个调用来多次运行协同程序。
是否有任何性能提升
除了讨论多次致电asyncio.run()
之外,我还想解决另一个问题。在评论中,@jwal说:
asyncio不是并行处理。文件上是这么说的。[…]如果您想要并行,请在具有单独CPU核心的计算机上的单独进程中运行,而不是在单独线程中运行,也不是在单独事件循环中运行。
暗示异步不适合并行处理,这可能会被误解和误导,得出这样的结论,即它不会导致性能提升,这并不总是正确的。而且它通常是假的!
因此,任何时候你都可以将作业委托给外部进程(不仅是python进程,它还可以是数据库工作进程、http调用,理想情况下是任何TCP套接字调用)。你可以使用asyncio来提高性能。在绝大多数情况下,当您使用一个公开异步接口的库时,该库的作者会努力最终等待网络/套接字/进程调用的结果。当此类套接字的响应尚未就绪时,事件循环可以完全自由地执行任何其他任务。如果循环有多个这样的任务,它将获得性能。
这种情况的一个典型例子是调用HTTP端点。在某个时刻,将进行网络调用,因此python线程可以在等待数据出现在TCP套接字缓冲区时自由地执行其他工作。我有一个例子!
该示例使用httpx库来比较对OpenWeatherMap API进行多次调用的性能。有两个功能:
get_weather_async()
get_weather_sync()
第一个对http API执行8个请求,但将这些请求调度到使用asyncio.gather()
在事件循环上协同运行(不是并发!)。
第二个按顺序执行8个同步请求。
为了调用异步函数,我实际上使用了asyncio.run()
方法。此外,我使用timeit
模块对asyncio.run()
执行了4次这样的调用。因此,在一个python应用程序中,asyncio.run()
被调用了4次,只是为了挑战我之前的考虑。
from time import time
import httpx
import asyncio
import timeit
from random import uniform
class AsyncWeatherApi:
def __init__(
self, base_url: str = "https://api.openweathermap.org/data/2.5"
) -> None:
self.client: httpx.AsyncClient = httpx.AsyncClient(base_url=base_url)
async def weather(self, lat: float, lon: float, app_id: str) -> dict:
response = await self.client.get(
"/weather",
params={
"lat": lat,
"lon": lon,
"appid": app_id,
"units": "metric",
},
)
response.raise_for_status()
return response.json()
class SyncWeatherApi:
def __init__(
self, base_url: str = "https://api.openweathermap.org/data/2.5"
) -> None:
self.client: httpx.Client = httpx.Client(base_url=base_url)
def weather(self, lat: float, lon: float, app_id: str) -> dict:
response = self.client.get(
"/weather",
params={
"lat": lat,
"lon": lon,
"appid": app_id,
"units": "metric",
},
)
response.raise_for_status()
return response.json()
def get_random_locations() -> list[tuple[float, float]]:
"""generate 8 random locations in +/-europe"""
return [(uniform(45.6, 52.3), uniform(-2.3, 29.4)) for _ in range(8)]
async def get_weather_async(locations: list[tuple[float, float]]):
api = AsyncWeatherApi()
return await asyncio.gather(
*[api.weather(lat, lon, api_key) for lat, lon in locations]
)
def get_weather_sync(locations: list[tuple[float, float]]):
api = SyncWeatherApi()
return [api.weather(lat, lon, api_key) for lat, lon in locations]
api_key = "secret"
def time_async_job(repeat: int = 1):
locations = get_random_locations()
def run():
return asyncio.run(get_weather_async(locations))
duration = timeit.Timer(run).timeit(repeat)
print(
f"[ASYNC] In {duration}s: done {len(locations)} API calls, all"
f" repeated {repeat} times"
)
def time_sync_job(repeat: int = 1):
locations = get_random_locations()
def run():
return get_weather_sync(locations)
duration = timeit.Timer(run).timeit(repeat)
print(
f"[SYNC] In {duration}s: done {len(locations)} API calls, all repeated"
f" {repeat} times"
)
if __name__ == "__main__":
time_sync_job(4)
time_async_job(4)
最后,打印了性能比较。上面写着:
[SYNC] In 5.5580058859995916s: done 8 API calls, all repeated 4 times
[ASYNC] In 2.865574334995472s: done 8 API calls, all repeated 4 times
这4次重复只是为了表明你可以安全地多次运行asyncio.run()
。它实际上对异步http调用的性能测量产生了破坏性的影响,因为所有32个请求实际上都是在8个异步任务的四个同步批中运行的。只是为了比较一批32个请求的性能:
[SYNC] In 4.373898585996358s: done 32 API calls, all repeated 1 times
[ASYNC] In 1.5169846520002466s: done 32 API calls, all repeated 1 times
所以,是的,若只使用适当的异步库,它可以而且通常会导致性能提高(若库公开了异步API,它通常是故意这样做的,因为它知道某个地方会有网络调用)。