2015-01-29 3 views
3

Мне нужно сделать запросы на 100 тыс. Голов, и я использую gevent поверх запросов. Мой код работает некоторое время, но потом в конце концов зависает. Я не уверен, почему он висит, или он висит внутри запросов или gevent. Я использую аргумент таймаута внутри обоих запросов и gevent.gevent/requests зависает, делая много головных запросов

Пожалуйста, взгляните на мой фрагмент кода ниже и дайте мне знать, что я должен изменить.

import gevent 
from gevent import monkey, pool 
monkey.patch_all() 
import requests 

def get_head(url, timeout=3): 
    try: 
     return requests.head(url, allow_redirects=True, timeout=timeout) 
    except: 
     return None 

def expand_short_urls(short_urls, chunk_size=100, timeout=60*5): 
    chunk_list = lambda l, n: (l[i:i+n] for i in range(0, len(l), n)) 
    p = pool.Pool(chunk_size) 
    print 'Expanding %d short_urls' % len(short_urls) 
    results = {} 
    for i, _short_urls_chunked in enumerate(chunk_list(short_urls, chunk_size)): 
     print '\t%d. processing %d urls @ %s' % (i, chunk_size, str(datetime.datetime.now())) 
     jobs = [p.spawn(get_head, _short_url) for _short_url in _short_urls_chunked] 
     gevent.joinall(jobs, timeout=timeout) 
     results.update({_short_url:job.get().url for _short_url, job in zip(_short_urls_chunked, jobs) if job.get() is not None and job.get().status_code==200}) 
    return results 

Я попытался grequests, но это было заброшено, и я прошел через выдвижные запросы GitHub, но все они имеют проблемы тоже.

+0

Является ли 'gevent' трудным требованием? – That1Guy

+0

nope. Я думал о переходе к торнадо. gevent также съедает мой баран. – vgoklani

+0

Похоже, вы используете 'gevent', чтобы просто иметь пул работников. Это верно? Будет ли тогда еще один пул сортов или есть что-то конкретное о 'gevent' и' tornado', которое вам нужно? – That1Guy

ответ

8

Использование оперативной памяти вы наблюдаете в основном происходит из всех данных который накапливается при сохранении 100 000 объектов ответа и всех основных накладных расходов. Я воспроизвел ваше заявление и уволил запросы HEAD против 15000 URL-адресов из верхней оценки Alexa. Это не имеет особого значения

  • используется ли я GEvent Pool (т.е. один greenlet за соединение) или фиксированный набор greenlets, все запрашивающие несколько URL-адресов
  • насколько большой я установил размер пула

В конце концов, использование ОЗУ увеличилось со временем, до значительных сумм. Тем не менее, я заметил, что переход от requests к urllib2 уже приводит к сокращению использования ОЗУ примерно на два. То есть, я заменил

result = requests.head(url) 

с

request = urllib2.Request(url) 
request.get_method = lambda : 'HEAD' 
result = urllib2.urlopen(request) 

Некоторые другие советы: не использовать два механизма тайм-аута. Тайм-аут подход GEvent является очень прочной, и вы можете легко использовать его как это:

def gethead(url): 
    result = None 
    try: 
     with Timeout(5, False): 
      result = requests.head(url) 
    except Exception as e: 
     result = e 
    return result 

Может выглядеть сложным, но возвращает либо None (после того, как достаточно точно 5 секунд, и указывает тайм-аут), любой объект исключения, представляющий собой коммуникационную ошибку , или ответ. Прекрасно работает!

Хотя это, вероятно, не является частью проблемы, в таких случаях я рекомендую сохранить работников живым и позволить им работать с несколькими элементами каждый! Накладные порождения зеленых зелени на самом деле малы. Тем не менее, это было бы очень простым решением с набором долгоживущих greenlets:

def qworker(qin, qout): 
    while True: 
     try: 
      qout.put(gethead(qin.get(block=False))) 
     except Empty: 
      break 

qin = Queue() 
qout = Queue() 

for url in urls: 
    qin.put(url) 

workers = [spawn(qworker, qin, qout) for i in xrange(POOLSIZE)] 
joinall(workers) 
returnvalues = [qout.get() for _ in xrange(len(urls))] 

Кроме того, вам действительно нужно понимать, что это широкомасштабная проблема, которую вы беретесь там, получая нестандартные вопросы ,Когда я воспроизвел ваш сценарий с временем ожидания 20 с и 100 рабочих и 15000 URL, чтобы запросить, я легко получил большое количество розеток:

# netstat -tpn | wc -l 
10074 

То есть, операционная система была более чем 10000 розетки для управления, большинство из них в состоянии TIME_WAIT. Я также заметил ошибки «Слишком много открытых файлов» и настроил ограничения, используя sysctl. Когда вы запрашиваете 100 000 URL-адресов, вы, вероятно, тоже столкнетесь с такими ограничениями, и вам нужно придумать меры по предотвращению голодания системы.

Также обратите внимание на то, как вы используете запросы, он автоматически следует переадресации с HTTP на HTTPS и автоматически проверяет сертификат, который, безусловно, стоит ОЗУ.

В моих измерениях, когда я разделял количество запрошенных URL-адресов по времени выполнения программы, я почти никогда не пропускал 100 ответов/с, что является результатом соединений с высокой задержкой с зарубежными серверами по всему миру. Я полагаю, вы также пострадали от такого предела. Отрегулируйте остальную часть архитектуры до этого предела, и вы, вероятно, сможете генерировать поток данных из Интернета на диск (или базу данных) с не столь большим объемом использования ОЗУ между ними.

я должен адресовать ваши два основных вопроса, а именно:

Я думаю GEvent/как вы используете это не ваша проблема. Я думаю, вы просто недооцениваете сложность своей задачи. Это происходит вместе с неприятными проблемами и приводит к ограничению вашей системы.

  • оперативная память проблема использования: Начните с помощью urllib2, если вы можете. Затем, если вещи накапливаются еще слишком высоко, вам нужно работать против накопления. Постарайтесь создать устойчивое состояние: вы можете начать записывать данные на диск и, как правило, работать в ситуации, когда объекты могут стать мусором.

  • Ваш код «в конечном итоге зависает»: возможно, это касается проблемы с оперативной памятью. Если это не так, то не создавайте столько зеленых, но повторного использования их, как указано. Кроме того, дальнейшее сокращение параллелизма, отслеживание количества открытых сокетов, увеличение системных ограничений, если необходимо, и попытаться выяснить точно, где ваше программное обеспечение зависает.

+1

Благодарим вас за красноречивый ответ. – vgoklani

+1

@vgoklani: спасибо за прекрасный комментарий :-) –

+0

:) Кстати, я использую фьючерсы сейчас, гораздо эффективнее. – vgoklani

1

Я не уверен, что это решит вашу проблему, но вы не используете pool.Pool() правильно.

Попробуйте это:

def expand_short_urls(short_urls, chunk_size=100): 
    # Pool() automatically limits your process to chunk_size greenlets running concurrently 
    # thus you don't need to do all that chunking business you were doing in your for loop 
    p = pool.Pool(chunk_size) 
    print 'Expanding %d short_urls' % len(short_urls) 

    # spawn() (both gevent.spawn() and Pool.spawn()) returns a gevent.Greenlet object 
    # NOT the value your function, get_head, will return 
    threads = [p.spawn(get_head, short_url) for short_url in short_urls] 
    p.join() 

    # to access the returned value of your function, access the Greenlet.value property 
    results = {short_url: thread.value.url for short_url, thread in zip(short_urls, threads) 

если thread.value не является None и thread.value.status_code == 200} возвращение результатов