将 aiohttp 与多处理相结合



我正在制作一个脚本,该脚本可以获取近20 000页的HTML,并对其进行解析以仅获取其中的一部分。

我设法使用 asyncio 和 aiohttp 在数据帧中获取 20 000 页的内容,其中包含不同步的请求,但此脚本仍在等待获取所有页面以解析它们。

async def get_request(session, url, params=None):
    async with session.get(url, headers=HEADERS, params=params) as response:
        return await response.text()

async def get_html_from_url(urls):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            tasks.append(get_request(session, url))
        html_page_response = await asyncio.gather(*tasks)
    return html_page_response

html_pages_list = asyncio_loop.run_until_complete(get_html_from_url(urls))

一旦我有了每个页面的内容,我就设法使用多处理的池来并行解析。

get_whatiwant_from_html(html_content):
    parsed_html = BeautifulSoup(html_content, "html.parser")
    clean = parsed_html.find("div", class_="class").get_text()
    # Some re.subs
    clean = re.sub("", "", clean)
    clean = re.sub("", "", clean)
    clean = re.sub("", "", clean)  
    return clean

pool = Pool(4)
what_i_want = pool.map(get_whatiwant_from_html, html_content_list)

这段代码异步混合了获取和解析,但我想将多处理集成到其中:

async def process(url, session):
    html = await getRequest(session, url)
    return await get_whatiwant_from_html(html)
async def dispatch(urls):
    async with aiohttp.ClientSession() as session:
        coros = (process(url, session) for url in urls)
        return await asyncio.gather(*coros)
result = asyncio.get_event_loop().run_until_complete(dispatch(urls))

有什么明显的方法可以做到这一点吗?我想过创建 4 个进程,每个进程运行异步调用,但实现看起来有点复杂,我想知道是否有另一种方法。

我对 asyncio 和 aiohttp 很陌生,所以如果你有什么建议我阅读以获得更好的理解,我会很高兴。

您可以使用

ProcessPoolExecutor。

使用run_in_executor您可以在主异步进程中执行IO。

但是您在单独的进程中进行繁重的 CPU 计算。

async def get_data(session, url, params=None):
    loop = asyncio.get_event_loop()
    async with session.get(url, headers=HEADERS, params=params) as response:
        html = await response.text()
        data = await loop.run_in_executor(None, partial(get_whatiwant_from_html, html))
        return data
async def get_data_from_urls(urls):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            tasks.append(get_data(session, url))
        result_data = await asyncio.gather(*tasks)
    return result_data
executor = concurrent.futures.ProcessPoolExecutor(max_workers=10)
asyncio_loop.set_default_executor(executor)
results = asyncio_loop.run_until_complete(get_data_from_urls(urls))
你可以

通过将BeautifulSoup解析器从html.parser更改为lxml来提高解析速度,这是迄今为止最快的,其次是html5lib.html.parser是其中最慢的。

您的瓶颈不是处理问题,而是 IO。您可能需要多个线程而不处理:

例如,这是一个模板程序,可以抓取和休眠以使其变慢,但在多个线程中运行,从而更快地完成任务。

from concurrent.futures import ThreadPoolExecutor
import random,time
from bs4 import BeautifulSoup as bs
import requests
URL = 'http://quotesondesign.com/wp-json/posts'
def quote_stream():
    '''
    Quoter streamer
    '''
    param = dict(page=random.randint(1, 1000))
    quo = requests.get(URL, params=param)
    if quo.ok:
        data = quo.json()
        author = data[0]['title'].strip()
        content = bs(data[0]['content'], 'html5lib').text.strip()
        print(f'{content}n-{author}n')
    else:
        print('Connection Issues :(')
def multi_qouter(workers=4):
    with ThreadPoolExecutor(max_workers=workers) as executor:
        _ = [executor.submit(quote_stream) for i in range(workers)]
if __name__ == '__main__':
    now = time.time()
    multi_qouter(workers=4)
    print(f'Time taken {time.time()-now:.2f} seconds')

在您的情况下,创建一个函数来执行您希望从星空完成的任务。此函数将接受 url 和必要的参数作为参数。之后创建另一个函数,在不同的线程中调用前一个函数,每个线程都有其我们的 url。所以不是 iin range(..),而是网址中的网址。您可以一次运行 2000 个线程,但我更喜欢 200 个线程的块并行运行。

最新更新