将同步请求包装到Asyncio(异步/等待)中



我正在python 3.6中编写一个工具,该工具将请求发送到多个API(带有各种端点(,并收集其对解析的响应并将其保存在数据库中。

我使用的API客户端具有同步请求URL的版本,例如他们使用

urllib.request.Request('...

或他们使用Kenneth Reitz'Requests库。

由于我的API调用依赖于请求URL的同步版本,因此整个过程需要几分钟才能完成。

现在,我想将我的API调用包装在异步/等待(异步(中。我正在使用Python 3.6。

我发现的所有示例/教程都希望我将同步URL调用/requests更改为IT的异步版本(例如aiohttp(。由于我的代码依赖于我尚未写的API客户端(我无法更改(,因此我需要将该代码不变。

因此,是否可以将我的同步请求(阻止代码(包装在异步/等待中,以使它们在事件循环中运行?

我是Python的Asyncio的新手。这将是nodejs中的不费吹灰之力。但是我不能用python缠绕我的头。

更新2023-06-12

这是我在python 3.9

中做到的
import asyncio
import requests
async def main():
    response1 = await asyncio.to_thread(requests.get, 'http://httpbin.org/get')
    response2 = await asyncio.to_thread(requests.get, 'https://api.github.com/events')
    print(response1.text)
    print(response2.text)
asyncio.run(main())

解决方案是将您的同步代码包装在线程中并以这种方式运行。我使用该精确系统使我的asyncio代码运行boto3(注意:如果运行< python3.6(:

async def get(self, key: str) -> bytes:
    s3 = boto3.client("s3")
    loop = asyncio.get_event_loop()
    try:
        response: typing.Mapping = 
            await loop.run_in_executor(  # type: ignore
                None, functools.partial(
                    s3.get_object,
                    Bucket=self.bucket_name,
                    Key=key))
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "NoSuchKey":
            raise base.KeyNotFoundException(self, key) from e
        elif e.response["Error"]["Code"] == "AccessDenied":
            raise base.AccessDeniedException(self, key) from e
        else:
            raise
    return response["Body"].read()

请注意,这将起作用,因为s3.get_object()代码中的大量时间用于等待I/O,并且(通常(在等待I/O Python释放GIL时(GIL是通常线程螺纹的原因(在python中不是一个好主意(。

run_in_executor中的第一个参数 None表示我们在默认执行程序中运行。这是threadpool遗嘱执行人,但可能会使明确分配threadpool executor的事情更明确。

请注意,在使用纯ASYNC I/O的情况下,您可以轻松地同时打开数千个连接,使用ThreadPool Executer表示每个同时呼叫对API的呼叫都需要单独的线程。一旦您的池中的线程用完了,直到线程可用之前,ThreadPool将不会安排您的新调用。您显然可以提高线程数量,但这会吞噬记忆。不要指望能够跨越几千。

还请参见Python ThreadPoolExecutor文档,以获取解释,以及有关如何将同步调用的代码稍有不同的代码。

最新更新