我正在python 3.6中编写一个工具,该工具将请求发送到多个API(带有各种端点(,并收集其对解析的响应并将其保存在数据库中。
我使用的API客户端具有同步请求URL的版本,例如他们使用
urllib.request.Request('...
或他们使用Kenneth Reitz'Requests
库。
由于我的API调用依赖于请求URL的同步版本,因此整个过程需要几分钟才能完成。
现在,我想将我的API调用包装在异步/等待(异步(中。我正在使用Python 3.6。
我发现的所有示例/教程都希望我将同步URL调用/requests
更改为IT的异步版本(例如aiohttp
(。由于我的代码依赖于我尚未写的API客户端(我无法更改(,因此我需要将该代码不变。
因此,是否可以将我的同步请求(阻止代码(包装在异步/等待中,以使它们在事件循环中运行?
我是Python的Asyncio的新手。这将是nodejs中的不费吹灰之力。但是我不能用python缠绕我的头。
更新2023-06-12
这是我在python 3.9
中做到的import asyncio
import requests
async def main():
response1 = await asyncio.to_thread(requests.get, 'http://httpbin.org/get')
response2 = await asyncio.to_thread(requests.get, 'https://api.github.com/events')
print(response1.text)
print(response2.text)
asyncio.run(main())
解决方案是将您的同步代码包装在线程中并以这种方式运行。我使用该精确系统使我的asyncio
代码运行boto3
(注意:如果运行< python3.6(:
async def get(self, key: str) -> bytes:
s3 = boto3.client("s3")
loop = asyncio.get_event_loop()
try:
response: typing.Mapping =
await loop.run_in_executor( # type: ignore
None, functools.partial(
s3.get_object,
Bucket=self.bucket_name,
Key=key))
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
raise base.KeyNotFoundException(self, key) from e
elif e.response["Error"]["Code"] == "AccessDenied":
raise base.AccessDeniedException(self, key) from e
else:
raise
return response["Body"].read()
请注意,这将起作用,因为s3.get_object()
代码中的大量时间用于等待I/O,并且(通常(在等待I/O Python释放GIL时(GIL是通常线程螺纹的原因(在python中不是一个好主意(。
run_in_executor
中的第一个参数 None
表示我们在默认执行程序中运行。这是threadpool遗嘱执行人,但可能会使明确分配threadpool executor的事情更明确。
请注意,在使用纯ASYNC I/O的情况下,您可以轻松地同时打开数千个连接,使用ThreadPool Executer表示每个同时呼叫对API的呼叫都需要单独的线程。一旦您的池中的线程用完了,直到线程可用之前,ThreadPool将不会安排您的新调用。您显然可以提高线程数量,但这会吞噬记忆。不要指望能够跨越几千。
还请参见Python ThreadPoolExecutor文档,以获取解释,以及有关如何将同步调用的代码稍有不同的代码。