2012-02-08 4 views
1

Я запрограммировал скрипт, который должен разрешать несколько имен хостов в IP-адресах с использованием многопоточности.Python socket.gethostbyname_ex() multithread failed

Однако он не работает и замерзает в некоторой случайной точке. Как это можно решить?

num_threads = 100 
conn = pymysql.connect(host='xx.xx.xx.xx', unix_socket='/tmp/mysql.sock', user='user', passwd='pw', db='database') 
cur = conn.cursor() 
def mexec(befehl): 
    cur = conn.cursor() 
    cur.execute(befehl) 

websites = ['facebook.com','facebook.org' ... ... ... ...] \#10.000 websites in array 
queue = Queue() 
def getips(i, q): 
    while True: 
     #--resolve IP-- 
     try: 
      result = socket.gethostbyname_ex(site) 
      print(result) 
      mexec("UPDATE sites2block SET ip='"+result+"', updated='yes' ") #puts site in mysqldb 
     except (socket.gaierror): 
      print("no ip") 
      mexec("UPDATE sites2block SET ip='no ip', updated='yes',") 
     q.task_done() 
#Spawn thread pool 
for i in range(num_threads): 
    worker = Thread(target=getips, args=(i, queue)) 
    worker.setDaemon(True) 
    worker.start() 
#Place work in queue 
for site in websites: 
    queue.put(site) 
#Wait until worker threads are done to exit 
queue.join() 
+0

Какие ошибки вы получаете? –

+0

sry, забыл принять! Я не получаю конкретных ошибок, сценарий работает и в какой-то момент просто зависает, не показывая никаких конкретных ошибок. Затем я должен убить оболочку. – user670186

+0

Пример неполный - что такое 'mexec '? –

ответ

3

Вы можете использовать значение дозорного сигнализировать потоков, там нет работы и присоединиться к нити вместо queue.task_done() и queue.join():

#!/usr/bin/env python 
import socket 
from Queue import Queue 
from threading import Thread 

def getips(queue): 
    for site in iter(queue.get, None): 
     try: # resolve hostname 
      result = socket.gethostbyname_ex(site) 
     except IOError, e: 
      print("error %s reason: %s" % (site, e)) 
     else: 
      print("done %s %s" % (site, result)) 

def main(): 
    websites = "youtube google non-existent.example facebook yahoo live".split() 
    websites = [name+'.com' for name in websites] 

    # Spawn thread pool 
    queue = Queue() 
    threads = [Thread(target=getips, args=(queue,)) for _ in range(20)] 
    for t in threads: 
     t.daemon = True 
     t.start() 

    # Place work in queue 
    for site in websites: queue.put(site) 
    # Put sentinel to signal the end 
    for _ in threads: queue.put(None) 
    # Wait for completion 
    for t in threads: t.join() 

main() 

gethostbyname_ex() функция устарела. Чтобы поддерживать оба адреса IPv4/v6, вместо этого вы можете использовать socket.getaddrinfo().

+0

его очереди очереди очереди. Этот код также страдает! Благодаря! – user670186

+1

Python 2.x использует 'from Queue import Queue'. Python 3.x - 'from queue import Queue', чтобы выполнить соглашение об именах модулей [pep-8] (http://www.python.org/dev/peps/pep-0008/). Чтобы избежать путаницы, вы можете использовать тег 'python-3.x', когда задаете вопрос о' python-3.x'. – jfs

1

Моя первая мысль была, что вы получите ошибки из-за перегрузки на DNS - возможно, ваш распознаватель просто не позволяет вам делать больше, чем определенное количество запросов на время.


Кроме того, я заметил некоторые проблемы:

  1. Вы забыли правильно назначить site в цикле while - который, вероятно, лучше заменить итерацией в for цикл по очереди, или что-то. В вашей версии вы используете переменную site из пространства имен уровня модуля, что может привести к тому, что запросы будут выполняться дважды, а другие пропущены.

    В этом месте у вас есть контроль над тем, что в очереди все еще есть записи или их ждет. Если оба нет, вы можете выйти из своей нити.

  2. По соображениям безопасности, вы бы лучше сделать

    def mexec(befehl, args=None): 
        cur = conn.cursor() 
        cur.execute(befehl, args) 
    

    для того, чтобы сделать потом

    mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb 
    

Для того, чтобы остаться совместимым с будущими протоколами, вы должны использовать socket.getaddrinfo() вместо socket.gethostbyname_ex(site). Там вы получаете все IP-адреса, которые вы хотите (сначала вы можете ограничить IPv4, но проще переключиться на IPv6) и, возможно, поместить их в базу данных.


Для очереди, примеры кода может быть

def queue_iterator(q): 
    """Iterate over the contents of a queue. Waits for new elements as long as the queue is still filling.""" 
    while True: 
     try: 
      item = q.get(block=q.is_filling, timeout=.1) 
      yield item 
      q.task_done() # indicate that task is done. 
     except Empty: 
      # If q is still filling, continue. 
      # If q is empty and not filling any longer, return. 
      if not q.is_filling: return 

def getips(i, q): 
    for site in queue_iterator(q): 
     #--resolve IP-- 
     try: 
      result = socket.gethostbyname_ex(site) 
      print(result) 
      mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb 
     except (socket.gaierror): 
      print("no ip") 
      mexec("UPDATE sites2block SET ip='no ip', updated='yes',") 
# Indicate it is filling. 
q.is_filling = True 
#Spawn thread pool 
for i in range(num_threads): 
    worker = Thread(target=getips, args=(i, queue)) 
    worker.setDaemon(True) 
    worker.start() 
#Place work in queue 
for site in websites: 
    queue.put(site) 
queue.is_filling = False # we are done filling, if q becomes empty, we are done. 
#Wait until worker threads are done to exit 
queue.join() 

должен сделать трюк.


Еще одна проблема заключается в параллельной вставке в MySQL. Вам разрешено выполнять только один запрос MySQL за раз. Таким образом, вы можете либо защитить доступ через threading.Lock(), либо RLock(), либо вы можете отправить ответы в другую очередь, которая обрабатывается другим потоком, который может даже связывать их.

+0

Привет, спасибо! Для 1. вы можете отправить исправленный код, я просто не получаю его на работу ... – user670186

+0

@ user670186 Готово. Просто исправил это; другой материал не интегрирован. – glglgl

+0

может быть проще использовать блокировку 'iter (q.get, None)' и дозорный: 'для i в диапазоне (num_threads): q.put (None)' и присоединяться к потокам вместо ненадежного 'q.task_done (), q.join(), q.is_filling' – jfs

0

Вы можете найти его проще использовать concurrent.futures чем threading, multiprocessing, Queue непосредственно:

#!/usr/bin/env python3 
import socket 
# pip install futures on Python 2.x 
from concurrent.futures import ThreadPoolExecutor as Executor 

hosts = "youtube.com google.com facebook.com yahoo.com live.com".split()*100 
with Executor(max_workers=20) as pool: 
    for results in pool.map(socket.gethostbyname_ex, hosts, timeout=60): 
     print(results) 

Примечание: вы можете легко переключаться с помощью потоков в процессах:

from concurrent.futures import ProcessPoolExecutor as Executor 

Вам это нужно, если gethostbyname_ex() не является потокобезопасным для вашей ОС, например, it might be the case on OSX.

Если вы хотите обрабатывать исключения, которые могут возникнуть в gethostbyname_ex():

import concurrent.futures 

with Executor(max_workers=20) as pool: 
    future2host = dict((pool.submit(socket.gethostbyname_ex, h), h) 
         for h in hosts) 
    for f in concurrent.futures.as_completed(future2host, timeout=60): 
     e = f.exception() 
     print(f.result() if e is None else "{0}: {1}".format(future2host[f], e)) 

Он похож на the example from the docs.

 Смежные вопросы

  • Нет связанных вопросов^_^