使用PythonAiohttp时,将多个API请求合并为一个请求



我有一个API端点,它可以获取一个或多个对象ID并返回它们的响应,例如http://example.com/api/metadata?id=1&id=2&id=3。API端点对每个调用而不是每个ID都有速率限制,因此最好使用多个ID来调用API端点。

另一方面,我有现有的代码,试图获得每个ID的元数据,比如:

async def get_metadata(object_id):
response = await session.get(f"http://example.com/api/metadata?id={object_id}")
response.raise_for_status()
return (await response.json())['results'][object_id]

我想保持这个函数的签名不变,但对其进行更改,使其不进行单独的请求,而是阻止,直到a(50个ID准备好被提取b(出现10秒之类的超时,在该超时内,一些但不是50个ID已经准备好被取出。然后发出一个API请求,然后对get_metadata的每个(阻塞的(调用返回相应的结果。所以get_metadata的外部行为应该保持不变。

我尝试了一些使用信号量或队列的方法,但我被卡住了。那么,实现这一点的好方法是什么呢?

经过一段时间的修补,我想出了这个:

import abc, asyncio
class Batcher(metaclass=abc.ABCMeta):
def __init__(self, *, max_batch, timeout):
"""
Constructs a new Batcher. The parameter max_batch specifies
the queue capacity, while timeout is the deadline after which
a queue will be processed whether it’s at capacity or not.
"""
self.__batch = None
self.__event = None
self.__timeout = timeout
self.__maxsize = max_batch
async def __wait(self, event, batch):
try:
await asyncio.wait_for(event.wait(), timeout=self.__timeout)
except asyncio.TimeoutError:
self.__event = None
await self.__run(self.__batch)
async def __run(self, batch):
self.__batch = None
try:
await self._process(batch)
except Exception as e:
for _, future in batch:
if future.done():
continue
future.set_exception(e)
else:
for _, future in batch:
if future.done():
continue
future.set_result(None)
def _setup(self):
"""
Initialises a new batch.
"""
if self.__event is not None:
return
self.__batch = []
self.__event = asyncio.Event()
asyncio.create_task(self.__wait(self.__event, self.__batch))
def _finish(self):
"""
Marks the current batch as complete and starts processing it.
"""
self.__batch = None
self.__event.set()
self.__event = None 
def _enqueue(self, item):
"""
Adds an item to be processed in the next batch.
Returns: an awaitable that will return the result of processing
when awaited.
"""
self._setup()
future = asyncio.Future()
self.__batch.append((item, future))
if len(self.__batch) >= self.__maxsize:
self._finish()
return future
@abc.abstractmethod
async def _process(self, batch):
"""
Processes the current batch. The batch parameter contains a list
of pairs (item, future), where item is the value passed to _enqueue,
while future is an asyncio.Future. Call the .set_result and/or
.set_exception methods on the latter to return a result to the
caller; if you don’t assign a result yourself, the returned value
will be None.
"""
raise NotImplementedError

您可以对Batcher进行子类化,以围绕_enqueue创建一个外观,该外观将验证参数并为处理做好准备。

示例:

import urllib
def singleton(*args, **kwargs):
def wrapper(cls):
return cls(*args, **kwargs)
return wrapper
@singleton(max_batch=50, timeout=10)
class get_metadata(Batcher):
async def _process(self, batch):
qs = "&".join(
f"id={urllib.parse.quote(str(object_id))}"
for object_id, _ in batch
)
response = await session.get("http://example.com/api/metadata?" + qs)
response.raise_for_status()
results = (await response.json())['results']
for object_id, future in batch:
try:
future.set_result(results[object_id])
except Exception as e:
future.set_exception(e)
async def __call__(self, object_id):
if not isinstance(object_id, int):
raise ValueError(object_id)
return await self._enqueue(object_id)

这里,get_metadata是一个类实例,但由于__call__的特殊方法,您可以像调用普通函数一样调用它。

信号量在这里不起作用,因为它们的工作方式与您需要的相反:在一定数量的协同程序获取它们之前,它们不会阻塞。您需要一个异步等价的barrier,不幸的是,它在标准库中并不存在。

幸运的是,使用事件和列表实现屏障并不困难。你可以这样做(只是测试得很模糊(:

_waiters = []
_have_new_waiter = None
async def get_metadata(session, object_id):
global _have_new_waiter
if _have_new_waiter is None:
_have_new_waiter = asyncio.Event()
asyncio.create_task(_monitor_incoming(session))
future = asyncio.get_event_loop().create_future()
_waiters.append((object_id, future))
_have_new_waiter.set()
return await future
async def _monitor_incoming(session):
while True:
timeout = False
try:
await asyncio.wait_for(_have_new_waiter.wait(), 10)
except asyncio.TimeoutError:
timeout = True
_have_new_waiter.clear()
if len(_waiters) == 0 or len(_waiters) < 50 and not timeout:
continue
lst = _waiters[:]
del _waiters[:]
asyncio.create_task(_get_batch(session, lst))
async def _get_batch(session, waiter_lst):
object_ids = [object_id for (object_id, _future) in waiter_lst]
try:
async with session.get(
f"http://example.com/api/metadata?ids={'&'.join(map(str, object_ids))}"):
response.raise_for_status()
dct = response.json()['results']
except Exception as e:
for result, (_object_id, future) in zip(results, waiter_lst):
future.set_exception(e)
return
results = [results[object_id] for object_id in object_ids]
for result, (_object_id, future) in zip(results, waiter_lst):
future.set_result(result)

最新更新