在dict列表上有一个协程字段的asyncio.gather()



我有以下两个异步函数

from tornado.httpclient import AsyncHTTPClient
async def get_categories(): # return a list of str
# ....
http = AsyncHTTPClient()
resp = await http.fetch(....)
return [....]
async def get_details(category): # return a list of dict
# ....
http = AsyncHTTPClient()
resp = await http.fetch(....)
return [....]

现在,我需要创建一个函数来获取所有类别的详细信息(同时运行httpfetch(,并将它们组合在一起。

async def get_all_details():
categories = await get_categories()
tasks = list(map(lambda x: {'category': x, 'task':get_details(x)}, categories))
r = await asyncio.gather(*tasks) # error
# need to return [
#   {'category':'aaa', 'detail':'aaa detail 1'}, 
#   {'category':'aaa', 'detail':'aaa detail 2'}, 
#   {'category':'bbb', 'detail':'bbb detail 1'}, 
#   {'category':'bbb', 'detail':'bbb detail 2'}, 
#   {'category':'bbb', 'detail':'bbb detail 3'}, 
#   {'category':'ccc', 'detail':'ccc detail 1'}, 
#   {'category':'ccc', 'detail':'aaa detail 2'}, 
# ]

但是,列表行返回错误:

类型错误:不可更改的类型:"dict">

tasks具有以下值:

[{'category': 'aaa',
'task': <coroutine object get_docker_list at 0x000001B12B8560C0>},
{'category': 'bbb',
'task': <coroutine object get_docker_list at 0x000001B12B856F40>},
{'category': 'ccc',
'task': <coroutine object get_docker_list at 0x000001B12B856740>}]

顺便说一句,这是一种抑制http获取调用的方法吗?例如,最多有四个回迁同时运行。

gather接受协程(或其他不可用的(参数,并以相同的顺序返回其结果的元组。您正在向它传递一系列dict,其中一些dict的值是协程。gather不知道该怎么办,并试图将dict视为不可用对象,但很快就失败了。

生成dict列表的正确方法是只将协程传递给gather,等待结果,并将它们处理成一个新的dict:

async def get_all_details():
category_list = await get_categories()
details_list = await asyncio.gather(
*[get_details(category) for category in category_list]
)
return [
{'category': category, 'details': details}
for (category, details) in zip(category_list, details_list)
]

BTW,这是一种抑制http获取调用的方法吗?例如,最多有四个回迁同时运行。

限制并行调用的方便而惯用的方法是使用信号量:

async def get_details(category, limit):
# acquiring the semaphore passed as `limit` will allow at most a
# fixed number of coroutines to proceed concurrently
async with limit:
... the rest of the code ...
async def get_all_details():
limit = asyncio.Semaphore(4)
category_list = await get_categories()
details_list = await asyncio.gather(
*[get_details(category, limit) for category in category_list]
)
... the rest of the code ...