2015-11-04 6 views
5

Моя задача - загрузить 1M + изображения из заданного списка URL-адресов. Каков рекомендуемый способ сделать это?Загрузка изображений с gevent

После того, как я прочитал Greenlet Vs. Threads Я просмотрел gevent, но я не могу надежно его запустить. Я играл с тестовым набором из 100 URL-адресов, и иногда он заканчивается в 1,5 секунды, но иногда это занимает более 30 секунд, что странно, поскольку таймаут * для каждого запроса равен 0,1, поэтому он никогда не должен принимать более 10 секунд.

* смотрите ниже в коде

Я также посмотрел в grequests, но они, кажется, issues with exception handling.

Мои «требования» являются, что я могу

  • проанализировать ошибки, возникающих при загрузке (таймауты , поврежденные изображения ...),
  • отслеживает ход обработки обработанных изображений и
  • быть как можно быстрее.
from gevent import monkey; monkey.patch_all() 
from time import time 
import requests 
from PIL import Image 
import cStringIO 
import gevent.hub 
POOL_SIZE = 300 


def download_image_wrapper(task): 
    return download_image(task[0], task[1]) 

def download_image(image_url, download_path): 
    raw_binary_request = requests.get(image_url, timeout=0.1).content 
    image = Image.open(cStringIO.StringIO(raw_binary_request)) 
    image.save(download_path) 

def download_images_gevent_spawn(list_of_image_urls, base_folder): 
    download_paths = ['/'.join([base_folder, url.split('/')[-1]]) 
         for url in list_of_image_urls] 
    parameters = [[image_url, download_path] for image_url, download_path in 
      zip(list_of_image_urls, download_paths)] 
    tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters] 
    for task in tasks: 
     try: 
      task.get() 
     except Exception: 
      print 'x', 
      continue 
     print '.', 

test_urls = # list of 100 urls 

t1 = time() 
download_images_gevent_spawn(test_urls, 'download_temp') 
print time() - t1 
+2

Нужно ли использовать темы? Если вы можете использовать несколько процессов, вы можете сделать это с помощью 'multiprocessing.Pool', и вы можете найти его проще. Я использую 'pool.map (download_image, url_list)' и 'pool.join()', чтобы сделать что-то подобное. – foz

+1

@foz, спасибо, но я также попробовал «multiprocessing.Pool» с аналогичными проблемами. Также мне сказали, что «многопроцессорство» не является подходящим инструментом для таких задач: http://stackoverflow.com/a/27016937/380038 – Framester

+0

Интересно! Я вижу, что многопроцессорность не так эффективна/масштабируема, но я не понимаю, почему она не должна работать со скромным размером пула (32, как и у вас). Надеюсь, вы получите хороший ответ на это, я думаю, что я тоже кое-что узнаю! – foz

ответ

-1

Я предлагаю обратить внимание на Grablib http://grablib.org/

Это асинхронное анализатор на основе pycurl и multicurl. Также он пытается автоматически разрешить сетевую ошибку (например, повторите попытку, если тайм-аут и т. Д.).

Я считаю, что модуль захвата: Паук будет решать ваши проблемы на 99%. http://docs.grablib.org/en/latest/index.html#spider-toc

+0

Спасибо. Можете ли вы объяснить, что делает grablib по-другому или почему у вас есть идея, почему он будет работать лучше, чем мой подход? – Framester

+0

У вас есть прямые ссылки на изображения? Если да, то извините, вы все равно можете использовать Grab или что угодно. Grablib идеально подходит для ползания и разбора. Однако вы также можете использовать его для загрузки изображений, Grablib (специально для модуля Grab: Spider) повторяет задачи, где сетевая ошибка была> 400 и! = 404. Количество попыток может быть установлено вручную. Он имеет протоколирование и мониторинг процесса. –

1

Я думаю, что это будет лучше придерживаться urllib2, на примере https://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1

Попробуйте этот код, я полагаю, это то, что вы просите.

import gevent 
from gevent import monkey 

# patches stdlib (including socket and ssl modules) to cooperate with other greenlets 
monkey.patch_all() 

import sys 

urls = sorted(chloya_files) 

if sys.version_info[0] == 3: 
    from urllib.request import urlopen 
else: 
    from urllib2 import urlopen 


def download_file(url): 
    data = urlopen(url).read() 
    img_name = url.split('/')[-1] 
    with open('c:/temp/img/'+img_name, 'wb') as f: 
     f.write(data) 
    return True 


from time import time 

t1 = time() 
tasks = [gevent.spawn(download_file, url) for url in urls] 
gevent.joinall(tasks, timeout = 12.0) 
print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks)) 
print time() - t1 
+0

Спасибо, я пробовал этот код с 'urlopen (..., timeout = 0.1)', но он по-прежнему занимал более 100 секунд для 1000 URL-адресов, что указывает на то, что он не выполнял запросы параллельно. – Framester

+0

Возможно, это проблемы с сетью? В моем тесте потребовалось 10,1 секунды для 139 файлов с какого-то чешского сайта. У меня также были сомнения относительно параллелизма, но теперь я думаю, что я был ограничен удаленным веб-сервером, а не gevent-urlib2 – Ingaz

1

Там простое решение с помощью gevent и Requestssimple-requests

Использование RequestsSession для HTTP постоянного соединения. Поскольку gevent делает Requests асинхронным, я думаю, что нет необходимости в timeout в запросах HTTP.

По умолчанию requests.Session кэшей TCP соединений (pool_connections) для 10 хостов и пределов 10 одновременных запросов HTTP за кэшированными соединения TCP (pool_maxsize). Конфигурация по умолчанию должна быть изменена в соответствии с необходимостью, явно создав адаптер http.

session = requests.Session() 
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) 
session.mount('http://', http_adapter) 

Прерывать задачи как производитель-потребитель. Загрузка изображений - задача производителя, а обработка изображений - это потребительская задача.

Если библиотека обработки изображений PIL не является асинхронной, она может блокировать сопрограммы производителей. Если это так, потребительский пул может быть gevent.threadpool.ThreadPool. F.E.

from gevent.threadpool import ThreadPool 
consumer = ThreadPool(POOL_SIZE) 

Это обзор того, как это можно сделать. Я не тестировал код.

from gevent import monkey; monkey.patch_all() 
from time import time 
import requests 
from PIL import Image 
from io import BytesIO 
import os 
from urlparse import urlparse 
from gevent.pool import Pool 

def download(url): 
    try: 
     response = session.get(url) 
    except Exception as e: 
     print(e) 
    else: 
     if response.status_code == requests.codes.ok: 
      file_name = urlparse(url).path.rsplit('/',1)[-1] 
      return (response.content,file_name) 
     response.raise_for_status() 

def process(img): 
    if img is None: 
     return None 
    img, name = img 
    img = Image.open(BytesIO(img)) 
    path = os.path.join(base_folder, name) 
    try: 
     img.save(path) 
    except Exception as e: 
     print(e) 
    else: 
     return True 

def run(urls):   
    consumer.map(process, producer.imap_unordered(download, urls)) 

if __name__ == '__main__': 
     POOL_SIZE = 300 
     producer = Pool(POOL_SIZE) 
     consumer = Pool(POOL_SIZE) 

     session = requests.Session() 
     http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) 
     session.mount('http://', http_adapter) 

     test_urls = # list of 100 urls 
     base_folder = 'download_temp' 
     t1 = time() 
     run(test_urls) 
     print time() - t1 
+0

Спасибо за ваше предложение. Я пробовал свой код на своих URL-адресах, но для URL-адресов 1 тыс. Требуется> 200 с. Одна из проблем может заключаться в том, что большинство из них указывают на один домен, но многие из них также указывают на разные домены. – Framester

+0

Сколько времени вы считаете нужным? размер файла, пропускная способность клиента и загрузка сервера все играют роль в таймингах. –

+0

Я обновил свой ответ, предложив использовать «ThreadPool» для потребителей. Если обработка изображения связана с cpu, вы должны использовать 'multiprocessing.Pool'. –