2016-02-08 6 views
0

Я пытаюсь очистить огромное количество URL-адресов (приблизительно 3 миллиона), которые содержат данные в формате JSON в кратчайшие сроки. Для этого у меня есть код Python (python 3), который использует Queue, Multithreading и Urllib3. Все работает нормально в течение первых 3 минут, затем код начинает замедляться, тогда он, кажется, полностью застрял. Я прочитал все, что мог найти по этому вопросу, но, к сожалению, решение, похоже, требует знания, которое находится далеко за пределами меня.Скребок с многопоточной очередью + urllib3 страдает резким замедлением

Я попытался ограничить количество потоков: ничего не исправить. Я также попытался ограничить максимальную скорость моей очереди и изменить тайм-аут сокета, но он тоже не помог. Далекий сервер не блокирует меня и не вносит в черный список меня, так как я могу повторно запустить свой сценарий в любое время, когда хочу с хорошими результатами в начале (код начинает замедляться в довольно случайное время). Кроме того, иногда мое интернет-соединение, похоже, сокращается - поскольку я не могу заниматься серфингом на любом веб-сайте, но эта конкретная проблема не появляется каждый раз.

Вот мой код (просто на меня, пожалуйста, я новичок):

#!/usr/bin/env python 
import urllib3,json,csv 
from queue import Queue 
from threading import Thread 

csvFile = open("X.csv", 'wt',newline="") 
writer = csv.writer(csvFile,delimiter=";") 
     writer.writerow(('A','B','C','D')) 

def do_stuff(q): 
    http = urllib3.connectionpool.connection_from_url('http://www.XXYX.com/',maxsize=30,timeout=20,block=True) 

    while True: 

     try: 

      url = q.get() 
      url1 = http.request('GET',url)  

      doc = json.loads(url1.data.decode('utf8')) 

      writer.writerow((doc['A'],doc['B'], doc['C'],doc['D'])) 

     except: 
      print(url) 

     finally: 
      q.task_done() 

q = Queue(maxsize=200) 
num_threads = 15 

for i in range(num_threads): 
    worker = Thread(target=do_stuff, args=(q,)) 
    worker.setDaemon(True) 
    worker.start() 

for x in range(1,3000000): 
    if x < 10: 
     url = "http://www.XXYX.com/?i=" + str(x) + "&plot=short&r=json" 
    elif x < 100: 
     url = "http://www.XXYX.com/?i=tt00000" + str(x) + "&plot=short&r=json" 
    elif x < 1000: 
     url = "http://www.XXYX.com/?i=0" + str(x) + "&plot=short&r=json" 
    elif x < 10000: 
     url = "http://www.XXYX.com/?i=00" + str(x) + "&plot=short&r=json" 
    elif x < 100000: 
     url = "http://www.XXYX.com/?i=000" + str(x) + "&plot=short&r=json" 
    elif x < 1000000: 
     url = "http://www.XXYX.com/?i=0000" + str(x) + "&plot=short&r=json" 
    else: 
     url = "http://www.XXYX.com/?i=00000" + str(x) + "&plot=short&r=json" 

    q.put(url) 

q.join()  
csvFile.close() 
print("done") 
+0

Я не вижу ничего плохого в вашем коде. Весьма вероятно, что сервер, на котором вы стучите, блокирует или блокирует ваши соединения. Удостоверьтесь, что вы уважаете robots.txt и имеете разумное соотношение прав или имеете явное SLA с сайтом для его обхода. В противном случае, чтобы обойти автоматическое дросселирование, вам, вероятно, придется сканировать со многих IP-адресов. – shazow

+0

Как вы думаете, достаточно ли реализовать время сна в do_stuff()? Если да, то, как долго, по вашему мнению, позволит мне закончить сканирование? –

ответ

0

Как сказал shazow, это не дело потоков, но тайм-ауты, при котором каждый поток получает данные от сервера , Попробуйте включить некоторый timout в вашем коде:

finally: 
    sleep(50) 
    q.task_done() 

это также может быть улучшено путем генерации адаптивных таймаутов, например, Вы можете измерить, сколько данных вы успешно получили, и если это число уменьшается, увеличение времени сна, и вице- versa

+0

Это довольно интересно. Я буду контролировать данные, которые получаю за 1 минуту/за поток. Если это число уменьшится, поток будет спать в течение нескольких секунд * количество времени, которое оно уже спало (при этом время сна будет увеличиваться в зависимости от производительности потока - при необходимости) - иначе он продолжит свою работу и сбросит число пауз в один. –

+0

@ lo-bellin, я думаю, что это не сильно повлияло бы на производительность, чтобы выбрать более низкий интервал, например 5 секунд. Функции 'perf_counter' и' process_time' были добавлены в библиотеку _time_, они имеют большую точность. Например, алгоритм мог вычислять время сна, используя формулу sleep_t = const/(took2 - took1), чтобы он имел дело с чем-то вроде производной времени, затрачиваемого на обработку (что точно соответствует скорости выполнения). Таким образом, алгоритм должен быть гибким. – thodnev

+0

Я считаю, что мы добрались до понятия «знания, которое лежит далеко за пределами меня». Не возражаете ли вы мне объяснить немного более конкретный пример, пожалуйста? Поскольку я не понимаю, как «sleep_t = const/(took2 - took1)» будет гарантировать, что времени сна достаточно, чтобы код не забил сайт - –