我的任务是从给定的URL列表中下载1M以上的图像。建议的方法是什么?
在阅读了Greenlet Vs.Threads之后,我研究了gevent
,但未能使其可靠地运行。我玩了一个由100个url组成的测试集,有时它在1.5秒内完成,但有时它需要超过30秒,这很奇怪,因为每个请求的超时*是0.1,所以它永远不会超过10秒。
*参见以下代码
我还研究了grequests
,但它们似乎在异常处理方面存在问题。
我的"要求"是我可以
- 检查下载时出现的错误(超时、损坏的图像…)
- 监控已处理图像数量的进度,以及
- 尽可能快
from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
import cStringIO
import gevent.hub
POOL_SIZE = 300
def download_image_wrapper(task):
return download_image(task[0], task[1])
def download_image(image_url, download_path):
raw_binary_request = requests.get(image_url, timeout=0.1).content
image = Image.open(cStringIO.StringIO(raw_binary_request))
image.save(download_path)
def download_images_gevent_spawn(list_of_image_urls, base_folder):
download_paths = ['/'.join([base_folder, url.split('/')[-1]])
for url in list_of_image_urls]
parameters = [[image_url, download_path] for image_url, download_path in
zip(list_of_image_urls, download_paths)]
tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters]
for task in tasks:
try:
task.get()
except Exception:
print 'x',
continue
print '.',
test_urls = # list of 100 urls
t1 = time()
download_images_gevent_spawn(test_urls, 'download_temp')
print time() - t1
我认为最好使用urllib2,例如https://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1
试试这个代码,我想这就是你想要的。
import gevent
from gevent import monkey
# patches stdlib (including socket and ssl modules) to cooperate with other greenlets
monkey.patch_all()
import sys
urls = sorted(chloya_files)
if sys.version_info[0] == 3:
from urllib.request import urlopen
else:
from urllib2 import urlopen
def download_file(url):
data = urlopen(url).read()
img_name = url.split('/')[-1]
with open('c:/temp/img/'+img_name, 'wb') as f:
f.write(data)
return True
from time import time
t1 = time()
tasks = [gevent.spawn(download_file, url) for url in urls]
gevent.joinall(tasks, timeout = 12.0)
print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks))
print time() - t1
有一个使用gevent
和Requests
的简单请求的简单解决方案
使用Requests
会话进行HTTP持久连接。由于gevent
使Requests
异步,我认为在HTTP请求中不需要timeout
。
默认情况下,requests.Session
缓存10个主机的TCP连接(pool_connections
),并限制每个缓存的TCP连接10个并发HTTP请求(pool_maxsize
)。应该通过显式创建http适配器来调整默认配置以满足需要。
session = requests.Session()
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
session.mount('http://', http_adapter)
将任务分解为生产者-消费者。图像下载是生产者的任务,图像处理是消费者的任务。
如果图像处理库PIL
不是异步的,则它可以阻止生产者协程。如果是,则使用者池可以是gevent.threadpool.ThreadPool
。f.e.
from gevent.threadpool import ThreadPool
consumer = ThreadPool(POOL_SIZE)
这是如何做到这一点的概述。我没有测试代码。
from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
from io import BytesIO
import os
from urlparse import urlparse
from gevent.pool import Pool
def download(url):
try:
response = session.get(url)
except Exception as e:
print(e)
else:
if response.status_code == requests.codes.ok:
file_name = urlparse(url).path.rsplit('/',1)[-1]
return (response.content,file_name)
response.raise_for_status()
def process(img):
if img is None:
return None
img, name = img
img = Image.open(BytesIO(img))
path = os.path.join(base_folder, name)
try:
img.save(path)
except Exception as e:
print(e)
else:
return True
def run(urls):
consumer.map(process, producer.imap_unordered(download, urls))
if __name__ == '__main__':
POOL_SIZE = 300
producer = Pool(POOL_SIZE)
consumer = Pool(POOL_SIZE)
session = requests.Session()
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
session.mount('http://', http_adapter)
test_urls = # list of 100 urls
base_folder = 'download_temp'
t1 = time()
run(test_urls)
print time() - t1
我建议关注Grablibhttp://grablib.org/
它是一个基于pycurl和multicrl的异步解析器。它还尝试自动解决网络错误(如超时时重试等)。
我相信Grab:Spider模块将99%地解决您的问题。http://docs.grablib.org/en/latest/index.html#spider-toc