如何在python中加速网页抓取



我正在为学校做一个项目,我正在努力获取有关电影的数据。我已经设法编写了一个脚本,从IMDbPY和Open Movie DB API(omdbapi.com)获取我需要的数据。我遇到的挑战是,我试图获取22305部电影的数据,每个请求大约需要0.7秒。从本质上讲,我目前的剧本大约需要8个小时才能完成。寻找任何可能同时使用多个请求的方法或任何其他建议,以显著加快获取这些数据的过程。

import urllib2
import json
import pandas as pd
import time
import imdb
start_time = time.time() #record time at beginning of script
#used to make imdb.com think we are getting this data from a browser
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = { 'User-Agent' : user_agent }
#Open Movie Database Query url for IMDb IDs
url = 'http://www.omdbapi.com/?tomatoes=true&i='
#read the ids from the imdb_id csv file
imdb_ids = pd.read_csv('ids.csv')
cols = [u'Plot', u'Rated', u'tomatoImage', u'Title', u'DVD', u'tomatoMeter',
u'Writer', u'tomatoUserRating', u'Production', u'Actors', u'tomatoFresh',
u'Type', u'imdbVotes', u'Website', u'tomatoConsensus', u'Poster', u'tomatoRotten',
u'Director', u'Released', u'tomatoUserReviews', u'Awards', u'Genre', u'tomatoUserMeter',
u'imdbRating', u'Language', u'Country', u'imdbpy_budget', u'BoxOffice', u'Runtime',
u'tomatoReviews', u'imdbID', u'Metascore', u'Response', u'tomatoRating', u'Year',
u'imdbpy_gross']
#create movies dataframe
movies = pd.DataFrame(columns=cols)
i=0
for i in range(len(imdb_ids)-1):
start = time.time()
req = urllib2.Request(url + str(imdb_ids.ix[i,0]), None, headers) #request page
response = urllib2.urlopen(req) #actually call the html request
the_page = response.read() #read the json from the omdbapi query
movie_json = json.loads(the_page) #convert the json to a dict
#get the gross revenue and budget from IMDbPy
data = imdb.IMDb()
movie_id = imdb_ids.ix[i,['imdb_id']]
movie_id = movie_id.to_string()
movie_id = int(movie_id[-7:])
data = data.get_movie_business(movie_id)
data = data['data']
data = data['business']
#get the budget $ amount out of the budget IMDbPy string
try:
budget = data['budget']
budget = budget[0]
budget = budget.replace('$', '')
budget = budget.replace(',', '')
budget = budget.split(' ')
budget = str(budget[0]) 
except:
None
#get the gross $ amount out of the gross IMDbPy string
try:
budget = data['budget']
budget = budget[0]
budget = budget.replace('$', '')
budget = budget.replace(',', '')
budget = budget.split(' ')
budget = str(budget[0])
#get the gross $ amount out of the gross IMDbPy string
gross = data['gross']
gross = gross[0]
gross = gross.replace('$', '')
gross = gross.replace(',', '')
gross = gross.split(' ')
gross = str(gross[0])
except:
None
#add gross to the movies dict 
try:
movie_json[u'imdbpy_gross'] = gross
except:
movie_json[u'imdbpy_gross'] = 0
#add gross to the movies dict    
try:
movie_json[u'imdbpy_budget'] = budget
except:
movie_json[u'imdbpy_budget'] = 0
#create new dataframe that can be merged to movies DF    
tempDF = pd.DataFrame.from_dict(movie_json, orient='index')
tempDF = tempDF.T
#add the new movie to the movies dataframe
movies = movies.append(tempDF, ignore_index=True)
end = time.time()
time_took = round(end-start, 2)
percentage = round(((i+1) / float(len(imdb_ids))) * 100,1)
print i+1,"of",len(imdb_ids),"(" + str(percentage)+'%)','completed',time_took,'sec'
#increment counter
i+=1  
#save the dataframe to a csv file            
movies.to_csv('movie_data.csv', index=False)
end_time = time.time()
print round((end_time-start_time)/60,1), "min"

使用Eventlet库进行并发获取

根据评论中的建议,您应同时获取订阅源。这可以通过使用treadingmultiprocessingeventlet来完成。

安装eventlet

$ pip install eventlet

尝试eventlet中的网络爬虫示例

请参阅:http://eventlet.net/doc/examples.html#web-履带

理解eventlet的并发性

使用threading,系统会负责在线程之间进行切换。如果你必须访问一些常见的数据结构,这会带来很大的问题,因为你永远不知道哪个其他线程正在访问你的数据。然后,您开始使用同步的块、锁和信号量,只是为了同步对共享数据结构的访问。

使用eventlet会简单得多——您总是只运行一个线程,并且仅在I/O指令或其他eventlet调用时在它们之间跳转。您的其余代码不间断地运行,没有任何风险,另一个线程会破坏我们的数据。

您只需注意以下事项:

  • 所有I/O操作都必须是非阻塞的(这很容易,eventlet为您需要的大多数I/O提供了非阻塞版本)。

  • 你剩下的代码一定不能占用CPU,因为它会阻止在"绿色"线程之间切换更长的时间,"绿色"多线程的功能也会消失。

eventlet的最大优点是,它允许以简单的方式编写代码,而不会对锁、信号灯等造成太大破坏。

eventlet应用于代码

如果我理解正确的话,要获取的URL列表是预先知道的,在您的分析中处理它们的顺序并不重要。这将允许从eventlet几乎直接复制示例。我明白了,索引i有一些意义,所以您可以考虑将url和索引混合为一个元组,并将它们作为独立的作业进行处理。

当然还有其他方法,但我个人发现eventlet与其他技术相比非常容易使用,同时获得非常好的结果(尤其是获取提要)。您只需要掌握主要概念,并稍微小心地遵循eventlet的要求(保持非阻塞性)。

使用请求和eventlet请求获取url

使用requests进行异步处理的包多种多样,其中一个包使用eventlet并命名为erequests,请参阅https://github.com/saghul/erequests

URL的简单样本提取集

import erequests
# have list of urls to fetch
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
# erequests.async.get(url) creates asynchronous request
async_reqs = [erequests.async.get(url) for url in urls]
# each async request is ready to go, but not yet performed
# erequests.map will call each async request to the action
# what returns processed request `req`
for req in erequests.map(async_reqs):
if req.ok:
content = req.content
# process it here
print "processing data from:", req.url

处理此特定问题的问题

我们能够获取并以某种方式处理我们需要的所有url。但在这个问题中,处理绑定到源数据中的特定记录,所以我们需要将处理后的请求与记录索引进行匹配,以便获得最终处理的进一步细节。

正如我们稍后将看到的那样,异步处理不遵循请求的顺序,有些请求处理得更快,有些则处理得更晚,map会产生完成的任何请求。

一种选择是将给定url的索引附加到请求中,然后在处理返回的数据时使用它。

获取和处理具有保留url索引的url的复杂示例

注意:下面的示例相当复杂,如果您可以使用上面提供的解决方案,请跳过这个。但请确保您没有遇到下面检测到并解决的问题(URL正在修改,请求在重定向之后)。

import erequests
from itertools import count, izip
from functools import partial
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
def print_url_index(index, req, *args, **kwargs):
content_length = req.headers.get("content-length", None)
todo = "PROCESS" if req.status_code == 200 else "WAIT, NOT YET READY"
print "{todo}: index: {index}: status: {req.status_code}: length: {content_length}, {req.url}".format(**locals())
async_reqs = (erequests.async.get(url, hooks={"response": partial(print_url_index, i)}) for i, url in izip(count(), urls))
for req in erequests.map(async_reqs):
pass

根据要求连接挂钩

requests(以及erequests)允许定义名为response的事件的挂钩。每次请求得到响应时,都会调用这个钩子函数,它可以执行某些操作,甚至可以修改响应。

以下行定义了一些挂钩响应:

erequests.async.get(url, hooks={"response": partial(print_url_index, i)})

将url索引传递给hook函数

任何挂钩的签名应为func(req, *args, *kwargs)

但是我们还需要将我们正在处理的url的索引传递到hook函数中。

为此,我们使用functools.partial,它允许通过将一些参数固定为特定值来创建简化的函数。这正是我们所需要的,如果您看到print_url_index签名,我们只需要固定index的值,其余的将符合钩子函数的要求。

在我们的调用中,我们使用partial和简化函数print_url_index的名称,并为每个url提供它的唯一索引

索引可以由enumerate在循环中提供,在参数数量较多的情况下,我们可以以更高效的方式工作,并使用count,默认情况下,它会生成从0开始的每次递增的数字。

让我们运行它:

$ python ereq.py
WAIT, NOT YET READY: index: 3: status: 301: length: 66, http://python-requests.org/
WAIT, NOT YET READY: index: 4: status: 301: length: 58, http://kennethreitz.com/
WAIT, NOT YET READY: index: 0: status: 301: length: None, http://www.heroku.com/
PROCESS: index: 2: status: 200: length: 7700, http://httpbin.org/
WAIT, NOT YET READY: index: 1: status: 301: length: 64, http://python-tablib.org/
WAIT, NOT YET READY: index: 4: status: 301: length: None, http://kennethreitz.org
WAIT, NOT YET READY: index: 3: status: 302: length: 0, http://docs.python-requests.org
WAIT, NOT YET READY: index: 1: status: 302: length: 0, http://docs.python-tablib.org
PROCESS: index: 3: status: 200: length: None, http://docs.python-requests.org/en/latest/
PROCESS: index: 1: status: 200: length: None, http://docs.python-tablib.org/en/latest/
PROCESS: index: 0: status: 200: length: 12064, https://www.heroku.com/
PROCESS: index: 4: status: 200: length: 10478, http://www.kennethreitz.org/

这表明:

  • 请求未按生成顺序处理
  • 有些请求遵循重定向,因此钩子函数被多次调用
  • 仔细检查url值,我们可以看到,响应没有报告原始列表urls中的url,即使对于索引2,我们也附加了额外的/。这就是为什么在原始url列表中简单查找响应url对我们没有帮助的原因

当web抓取时,我们通常会遇到两种类型的瓶颈:

  1. IO块-每当我们提出请求时,我们都需要等待服务器做出响应,这可能会阻塞我们的整个程序
  2. CPU块-在解析web抓取内容时,我们的代码可能会受到CPU处理能力的限制

CPU速度

CPU块是一个简单的解决方案-我们可以生成更多的进程。通常,一个CPU核心可以有效地处理一个进程。因此,如果我们的刮刀在一台有12个CPU核心的机器上运行,我们可以产生12个进程来提高12倍的速度:

from concurrent.futures import ProcessPoolExecutor
def parse(html):
...  # CPU intensive parsing

htmls = [...]
with ProcessPoolExecutor() as executor:
for result in executor.map(parse, htmls):
print(result)

Python的ProcessPooolExecutor生成最佳数量的线程(相当于CPU内核),并通过它们分配任务。

IO速度

对于IO阻塞,我们有更多的选择,因为我们的目标是消除无用的等待,这些等待可以通过线程、进程和异步循环来完成。

如果我们发出数千个请求,我们就无法生成数百个进程。线程的成本会更低,但仍然有一个更好的选择-异步循环

异步循环可以不按特定顺序执行任务。换句话说,当任务A被阻止时,任务B可以接管程序。这非常适合网络抓取,因为计算开销很小。我们可以在一个程序中扩展到数千个请求。

不幸的是,要使asycio工作,我们需要使用支持asyncio的python包。例如,通过使用httpx和asyncio,我们可以显著加快我们的抓取速度:

# comparing synchronous `requests`:
import requests
from time import time
_start = time()
for i in range(50):
request.get("http://httpbin.org/delay/1")
print(f"finished in: {time() - _start:.2f} seconds")
# finished in: 52.21 seconds
# versus asynchronous `httpx`
import httpx
import asyncio
from time import time
_start = time()
async def main():
async with httpx.AsyncClient() as client:
tasks = [client.get("http://httpbin.org/delay/1") for i in range(50)]
for response_future in asyncio.as_completed(tasks):
response = await response_future
print(f"finished in: {time() - _start:.2f} seconds")
asyncio.run(main())
# finished in: 3.55 seconds

组合使用两者

使用异步代码,我们可以避免IO块,使用进程,我们可以扩大CPU密集型解析-优化web抓取的完美组合:

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time
import httpx

async def scrape(urls):
"""this is our async scraper that scrapes"""
results = []
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
scrape_tasks = [client.get(url) for url in urls]
for response_f in asyncio.as_completed(scrape_tasks):
response = await response_f
# emulate data parsing/calculation
sleep(0.5)
...
results.append("done")
return results

def scrape_wrapper(args):
i, urls = args
print(f"subprocess {i} started")
result = asyncio.run(scrape(urls))
print(f"subprocess {i} ended")
return result

def multi_process(urls):
_start = time()
batches = []
batch_size = multiprocessing.cpu_count() - 1  # let's keep 1 core for ourselves
print(f"scraping {len(urls)} urls through {batch_size} processes")
for i in range(0, len(urls), batch_size):
batches.append(urls[i : i + batch_size])
with ProcessPoolExecutor() as executor:
for result in executor.map(scrape_wrapper, enumerate(batches)):
print(result)
print("done")
print(f"multi-process finished in {time() - _start:.2f}")
def single_process(urls):
_start = time()
results = asyncio.run(scrape(urls))
print(f"single-process finished in {time() - _start:.2f}")

if __name__ == "__main__":
urls = ["http://httpbin.org/delay/1" for i in range(100)]
multi_process(urls)
# multi-process finished in 7.22
single_process(urls)
# single-process finished in 51.28

这些基础概念听起来很复杂,但一旦你把它缩小到问题的根源,修复就非常直接,并且已经在Python中出现了!

有关这个主题的更多详细信息,请参阅我的博客Web Scratching Speed:进程、线程和异步

最新更新