Python aiohttp/asyncio-如何处理返回的数据



Im正在使用aiohttp将一些同步代码移动到asyncio。同步代码运行需要15分钟,所以我希望能改进这一点。

我有一些工作代码,它从一些url中获取数据并返回每个url的主体。但这只是针对1个实验室站点,我有70多个实际站点。

因此,如果我得到一个循环,为所有网站创建一个所有URL的列表,那么在一个列表中会有700个URL需要处理。现在处理它们我觉得不是问题吗?

但是用结果做"事情",我不确定如何编程?我已经有了对返回的每个结果进行"填充"的代码,但我不知道如何针对正确类型的结果进行编程。

当代码运行时,它是否处理所有URL,并根据运行时间返回未知顺序?

我需要一个处理任何类型结果的函数吗?

import asyncio, aiohttp, ssl
from bs4 import BeautifulSoup
def page_content(page):
return BeautifulSoup(page, 'html.parser')

async def fetch(session, url):
with aiohttp.Timeout(15, loop=session.loop):
async with session.get(url) as response:
return page_content(await response.text())
async def get_url_data(urls, username, password):
tasks = []
# Fetch all responses within one Client session,
# keep connection alive for all requests.
async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session:
for i in urls:
task = asyncio.ensure_future(fetch(session, i))
tasks.append(task)
responses = await asyncio.gather(*tasks)
# you now have all response bodies in this variable
for i in responses:
print(i.title.text)
return responses

def main():
username = 'monitoring'
password = '*********'
ip = '10.10.10.2'
urls = [
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'),
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'),
'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'),
'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'),
]
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_url_data(urls,username,password))
data = loop.run_until_complete(future)
print(data)
if __name__ == "__main__":
main()

这里有一个concurrent.futures.ProcessPoolExecutor的例子。如果它是在没有指定max_workers的情况下创建的,那么实现将使用os.cpu_count。还要注意,asyncio.wrap_future是公共的,但没有文档。或者,还有AbstractEventLoop.run_in_executor

import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import lxml.html

def process_page(html):
'''Meant for CPU-bound workload'''
tree = lxml.html.fromstring(html)
return tree.find('.//title').text

async def fetch_page(url, session):
'''Meant for IO-bound workload'''
async with session.get(url, timeout = 15) as res:
return await res.text()

async def process(url, session, pool):
html = await fetch_page(url, session)
return await asyncio.wrap_future(pool.submit(process_page, html))

async def dispatch(urls):
pool = ProcessPoolExecutor()
async with aiohttp.ClientSession() as session:
coros = (process(url, session, pool) for url in urls)
return await asyncio.gather(*coros)

def main():
urls = [
'https://stackoverflow.com/',
'https://serverfault.com/',
'https://askubuntu.com/',
'https://unix.stackexchange.com/'
]
result = asyncio.get_event_loop().run_until_complete(dispatch(urls))
print(result)
if __name__ == '__main__':
main()

您的代码离标记不远了。asyncio.gather按参数的顺序返回结果,因此此处保留顺序,但不会按顺序调用page_content

一些调整:

首先,这里不需要ensure_future。只有当您试图使协同程序的寿命超过其父程序时,才需要创建任务,即即使创建任务的函数已经完成,任务也必须继续运行。在这里,您需要的是直接用协同程序调用asyncio.gather

async def get_url_data(urls, username, password):
async with aiohttp.ClientSession(...) as session:
responses = await asyncio.gather(*(fetch(session, i) for i in urls))
for i in responses:
print(i.title.text)
return responses

但是调用它会同时调度所有的获取,而且URL数量很大,这远不是最佳的。相反,您应该选择最大并发性,并确保在任何时候最多运行X次获取。为了实现这一点,您可以使用asyncio.Semaphore(20),这个信号量最多只能由20个协程获取,所以其他协程将等待获取,直到有一个点可用。

CONCURRENCY = 20
TIMEOUT = 15
async def fetch(session, sem, url):
async with sem:
async with session.get(url) as response:
return page_content(await response.text())
async def get_url_data(urls, username, password):
sem = asyncio.Semaphore(CONCURRENCY)
async with aiohttp.ClientSession(...) as session:
responses = await asyncio.gather(*(
asyncio.wait_for(fetch(session, sem, i), TIMEOUT)
for i in urls
))
for i in responses:
print(i.title.text)
return responses

这样,所有的获取都会立即开始,但其中只有20个能够获取信号量。其他的将在第一个async with指令处阻塞,并等待直到完成另一个获取。

我还替换了aiohttp。这里的官方异步等效超时。

最后,对于数据的实际处理,如果您受到CPU时间的限制,asyncio可能对您没有多大帮助。您需要在这里使用ProcessPoolExecutor来将实际工作并行到另一个CPU。run_in_executor可能对有用

最新更新