使用gevent下载图像



我的任务是从给定的URL列表中下载1M以上的图像。建议的方法是什么?

在阅读了Greenlet Vs.Threads之后,我研究了gevent,但未能使其可靠地运行。我玩了一个由100个url组成的测试集,有时它在1.5秒内完成,但有时它需要超过30秒,这很奇怪,因为每个请求的超时*是0.1,所以它永远不会超过10秒。

*参见以下代码

我还研究了grequests,但它们似乎在异常处理方面存在问题。

我的"要求"是我可以

  • 检查下载时出现的错误(超时、损坏的图像…)
  • 监控已处理图像数量的进度,以及
  • 尽可能快
from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
import cStringIO
import gevent.hub
POOL_SIZE = 300

def download_image_wrapper(task):
    return download_image(task[0], task[1])
def download_image(image_url, download_path):
    raw_binary_request = requests.get(image_url, timeout=0.1).content
    image = Image.open(cStringIO.StringIO(raw_binary_request))
    image.save(download_path)
def download_images_gevent_spawn(list_of_image_urls, base_folder):
    download_paths = ['/'.join([base_folder, url.split('/')[-1]])
                      for url in list_of_image_urls]
    parameters = [[image_url, download_path] for image_url, download_path in
             zip(list_of_image_urls, download_paths)]
    tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters]
    for task in tasks:
        try:
            task.get()
        except Exception:
            print 'x',
            continue
        print '.',
test_urls = # list of 100 urls
t1 = time()
download_images_gevent_spawn(test_urls, 'download_temp')
print time() - t1

我认为最好使用urllib2,例如https://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1

试试这个代码,我想这就是你想要的。

import gevent
from gevent import monkey
# patches stdlib (including socket and ssl modules) to cooperate with other greenlets
monkey.patch_all()
import sys
urls = sorted(chloya_files)
if sys.version_info[0] == 3:
    from urllib.request import urlopen
else:
    from urllib2 import urlopen

def download_file(url):
    data = urlopen(url).read()
    img_name = url.split('/')[-1]
    with open('c:/temp/img/'+img_name, 'wb') as f:
        f.write(data)
    return True

from time import time
t1 = time()
tasks = [gevent.spawn(download_file, url) for url in urls]
gevent.joinall(tasks, timeout = 12.0)
print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks))
print time() - t1

有一个使用geventRequests的简单请求的简单解决方案

使用Requests会话进行HTTP持久连接。由于gevent使Requests异步,我认为在HTTP请求中不需要timeout

默认情况下,requests.Session缓存10个主机的TCP连接(pool_connections),并限制每个缓存的TCP连接10个并发HTTP请求(pool_maxsize)。应该通过显式创建http适配器来调整默认配置以满足需要。

session = requests.Session()
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
session.mount('http://', http_adapter)

将任务分解为生产者-消费者。图像下载是生产者的任务,图像处理是消费者的任务。

如果图像处理库PIL不是异步的,则它可以阻止生产者协程。如果是,则使用者池可以是gevent.threadpool.ThreadPool。f.e.

from gevent.threadpool import ThreadPool
consumer = ThreadPool(POOL_SIZE)  

这是如何做到这一点的概述。我没有测试代码。

from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
from io import BytesIO
import os
from urlparse import urlparse
from gevent.pool import Pool
def download(url):
    try:
        response = session.get(url)
    except Exception as e:
        print(e)
    else:
        if response.status_code == requests.codes.ok:
            file_name = urlparse(url).path.rsplit('/',1)[-1]
            return (response.content,file_name)
        response.raise_for_status()
def process(img):
    if img is None:
        return None
    img, name = img
    img = Image.open(BytesIO(img))
    path = os.path.join(base_folder, name)
    try:
        img.save(path)
    except Exception as e:
        print(e)
    else:
        return True
def run(urls):        
    consumer.map(process, producer.imap_unordered(download, urls))
if __name__ == '__main__':
        POOL_SIZE = 300
        producer = Pool(POOL_SIZE)
        consumer = Pool(POOL_SIZE)
        session = requests.Session()
        http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
        session.mount('http://', http_adapter)
        test_urls = # list of 100 urls
        base_folder = 'download_temp'
        t1 = time()
        run(test_urls)
        print time() - t1  

我建议关注Grablibhttp://grablib.org/

它是一个基于pycurl和multicrl的异步解析器。它还尝试自动解决网络错误(如超时时重试等)。

我相信Grab:Spider模块将99%地解决您的问题。http://docs.grablib.org/en/latest/index.html#spider-toc

最新更新