asyncio/aiohttp-如何发出一系列异步但依赖的请求



我有一系列异步请求对,其中一对由请求a和请求B组成。此外,请求B依赖于请求a。换句话说,我需要将数据从响应a传递到请求B。因此,我需要安排任务,使每个任务发送请求a,然后只有在响应a返回后才发送请求B。

from aiohttp import ClientSession
from typing import *
import asyncio
async def request_A(url: str, session: ClientSession) -> dict:
async with session.request('get', url) as response:
return await response.json()
async def request_B(url: str, data: dict, session: ClientSession) -> dict:
async with session.request('post', url, json=data) as response:
return await response.json()
async def request_chain(url_A: str, url_B: str, session: ClientSession) -> dict:
response_A_data = await request_A(url_A, session)
response_B_data = await request_B(url_B, response_A_data, session)
return response_B_data
async def schedule(url_chains: List[Tuple[str, str]]) -> list:
tasks = []
async with ClientSession() as session:
for url_chain in url_chains:
url_A, url_B = url_chain
task = asyncio.create_task(request_chain(url_A, url_B, session))
tasks.append(task)
return await asyncio.gather(*tasks)
def run_tasks(url_chains: List[Tuple[str, str]]) -> list:
return asyncio.run(schedule(url_chains))

现在,我的问题是:对于由一对请求组成的每个任务,请求a是否保证在发送请求B之前返回?请解释。我担心在任务中,当等待请求A时,请求B可能会执行。

如果没有,我如何保持任务异步和非阻塞,同时确保在任务中,请求A阻塞请求B的执行,直到响应A返回?

我知道我可以在一个批处理中运行所有请求A调用,然后在一个批次中运行所有的请求B调用,但由于特定于我的用例的原因,我需要运行一批所有(请求A,请求B(对。

对于由一对请求组成的每个任务,是请求a保证在发送请求B之前返回?

是的,异步/等待模式的优点是您不必问自己这个问题,连续的代码行将始终按顺序执行(但不一定连续(。在这里,函数request_chain保证request_A总是在request_B之前执行。

在等待请求A时,请求B可能会执行

这不会发生,这基本上就是await的意思:挂起,直到请求A返回,然后再继续。换句话说,await对执行顺序没有影响。它只是交给控制,所以隐藏时间可以由其他人使用(在您的情况下,来自另一个(A,B(请求对的任何代码(。这就是为什么连续的代码行不一定是连续执行的原因,使用await将控制权交给其他一些协程(我们刚刚提到的其他人(允许该协程在A和B之间执行代码。

即使这有点不准确,你也可以记住:唯一将并行执行的代码是你自己调度的代码(在本例中使用asyncio.gather,调度几个(a,B(对并行执行(。

我知道我可以在一个批处理中运行所有请求A调用,然后在一个批次中运行所有的请求B调用,但由于特定于我的用例的原因,我需要运行一批所有。。。

在这种特殊情况下,即使您可以先运行一批a,然后再运行一批B的,我认为您的解决方案会更好,因为它以更简单的方式突出了aB之间的关系。

这里有一个代码示例,您可以运行它来尝试(它的作用与您在这里使用公共数学API所做的相同(,它只需分两步计算"x*2+2",首先是"*2"(相当于请求a(,然后是"+2"(相当于请求B

MATH_API_URL = "http://api.mathjs.org/v4"
from aiohttp import ClientSession
import asyncio
async def maths(session, url, expression):
params = {"expr" : expression}
print(f"t> computing {expression}")
async with session.get(url, params=params) as response:
result = await response.text()
print(f"t< {expression} = {result}")
return result
async def twice(session, x):
return await maths(session, MATH_API_URL, f"2 * {x}")
async def plus_two(session, x):
return await maths(session, MATH_API_URL, f"2 + {x}")
async def twice_plus_two(session, x):
twice_x = await twice(session, x)
return await plus_two(session, twice_x)
async def main(inputs):
async with ClientSession() as session:
return await asyncio.gather(*(twice_plus_two(session, x) for x in inputs))
inputs = list(range(3))
print([x*2+2 for x in inputs])
print(asyncio.run(main(inputs)))

此代码输出安排请求的顺序:

[2, 4, 6]    
> computing 2 * 0    
> computing 2 * 1    
> computing 2 * 2    
< 2 * 1 = 2    
> computing 2 + 2
< 2 * 0 = 0    
> computing 2 + 0
< 2 * 2 = 4    
> computing 2 + 4
< 2 + 2 = 4    
< 2 + 4 = 6
< 2 + 0 = 2
['2', '4', '6']

查看"*2"返回后如何安排"+2"。

最新更新