为任意可迭代对象创建异步迭代器



我正在创建一个对外部api进行许多调用的项目。这些API调用是在类实例的方法中进行的。我试图做一个通用的函数,它需要这些对象的可迭代对象,并为他们产生一个异步迭代器。然后,这个异步迭代器将用于使所有这些外部API调用异步运行。

然而,通过我下面的尝试,执行时间仍然随着列表的长度线性增加。

async def async_iterator(iterable: Iterable[Any]) -> AsyncIterator[Any]:
for i in iterable:
yield i
async for object in async_generator(iterable=list_of_objects):
await object.make_time_consuming_api_call()
# do other work on the object
await update_in_database(object=object)

如何异步迭代任何对象列表?

由于您正在等待object.make_time_consuming_api_call(),因此它等待每个调用在运行下一次迭代之前完成。您可以在提交所有调用之后等待它,例如asyncio.create_task:

async def async_iterator(iterable: Iterable[Any]) -> AsyncIterator[Any]:
for i in iterable:
yield i
async def main():
tasks = list()
async for object in async_generator(iterable=list_of_objects):
tasks.append(asyncio.create_task(object.make_time_consuming_api_call()))
# do other work on the object
for task, object in zip(tasks, list_of_objects):
await task
await update_in_database(object=object)
在本例中,您甚至不需要创建async_iterator:
async def main():
tasks = list()
for object in list_of_objects:
tasks.append(asyncio.create_task(object.make_time_consuming_api_call()))

或者更简洁一点:

async def main():
results = await asyncio.gather(*(object.make_time_consuming_api_call() for object in list_of_objects))
# Added this to store the result as an attribute (see comments)
for result, object in zip(results, list_of_objects):
object.attribute = result

最新更新