2016-11-23 3 views
1

Учитывая следующий класс:Могу ли я предположить, что мои потоки выполняются, когда threading.active_count() возвращает 1?

from abc import ABCMeta, abstractmethod 
from time import sleep 
import threading 
from threading import active_count, Thread 

class ScraperPool(metaclass=ABCMeta): 
    Queue = [] 
    ResultList = [] 

    def __init__(self, Queue, MaxNumWorkers=0, ItemsPerWorker=50): 
     # Initialize attributes 
     self.MaxNumWorkers = MaxNumWorkers 
     self.ItemsPerWorker = ItemsPerWorker 
     self.Queue = Queue # For testing purposes. 

    def initWorkerPool(self, PrintIDs=True): 
     for w in range(self.NumWorkers()): 
      Thread(target=self.worker, args=(w + 1, PrintIDs,)).start() 
      sleep(1) # Explicitly wait one second for this worker to start. 

    def run(self): 
     self.initWorkerPool() 

     # Wait until all workers (i.e. threads) are done. 
     while active_count() > 1: 
      print("Active threads: " + str(active_count())) 
      sleep(5) 

     self.HandleResults() 

    def worker(self, id, printID): 
     if printID: 
      print("Starting worker " + str(id) + ".") 

     while (len(self.Queue) > 0): 
      self.scraperMethod() 

     if printID: 
      print("Worker " + str(id) + " is quiting.") 

     # Todo Kill is this Thread. 

     return 

    def NumWorkers(self): 
     return 1 # Simplified for testing purposes. 

    @abstractmethod 
    def scraperMethod(self): 
     pass 

class TestScraper(ScraperPool): 
    def scraperMethod(self): 
     # print("I am scraping.") 
     # print("Scraping. Threads#: " + str(active_count())) 
     temp_item = self.Queue[-1] 
     self.Queue.pop() 

     self.ResultList.append(temp_item) 

    def HandleResults(self): 
     print(self.ResultList) 

ScraperPool.register(TestScraper) 

scraper = TestScraper(Queue=["Jaap", "Piet"]) 
scraper.run() 
print(threading.active_count()) 
# print(scraper.ResultList) 

Когда все нити сделаны, есть еще один активный поток - threading.active_count() на последней строке получает меня этот номер.

Активная нить <_MainThread(MainThread, started 12960)> - как напечатана с threading.enumerate().

Могу ли я предположить, что все мои темы выполняются, когда active_count() == 1? Или, например, импортированные модули запускают дополнительные потоки, так что мои потоки фактически выполняются, когда active_count() > 1 - также условие для цикла, который я использую в методе run.

+0

'class TestScraper():' отсутствует? – cat

+0

Чтобы все было просто, я отправил часть кода. Теперь я добавил ссылку pastebin на полный скрипт. – user2693053

+0

Код, который вы публикуете, должен быть минимальным, полным и поддающимся проверке ([mcve]), и это, похоже, пропустят импорт и определения – cat

ответ

2

Вы можете предположить, что ваши нити сделаны, когда active_count() достигает 1. Проблема не в том, если какой-либо другой модуль создает поток, вы не получите 1. Вы должны управлять потоками явно.

Пример: вы можете поместить потоки в список и присоединиться к ним по одному. Соответствующие изменения в вашем коде:

def __init__(self, Queue, MaxNumWorkers=0, ItemsPerWorker=50): 
    # Initialize attributes 
    self.MaxNumWorkers = MaxNumWorkers 
    self.ItemsPerWorker = ItemsPerWorker 
    self.Queue = Queue # For testing purposes. 
    self.WorkerThreads = [] 

def initWorkerPool(self, PrintIDs=True): 
    for w in range(self.NumWorkers()): 
     thread = Thread(target=self.worker, args=(w + 1, PrintIDs,)) 
     self.WorkerThreads.append(thread) 
     thread.start() 
     sleep(1) # Explicitly wait one second for this worker to start. 

def run(self): 
    self.initWorkerPool() 

    # Wait until all workers (i.e. threads) are done. Waiting in order 
    # so some threads further in the list may finish first, but we 
    # will get to all of them eventually 
    while self.WorkerThreads: 
     self.WorkerThreads[0].join() 

    self.HandleResults() 
+0

Я изначально отслеживал активные скребки в целочисленном 'scraperpool.ActiveWorkersCount', который увеличивался/уменьшался, когда рабочий запускался/останавливался. Затем я столкнулся с 'threading.active_count()' и попытался использовать этот метод. Является ли мой оригинальный подход тем, что вы имеете в виду под «управлять своими потоками»? – user2693053

+0

он подразумевает, что вы должны присоединиться к ним явно .... – user3012759

+0

Я не думаю, что я все время понимаю 'threading.join()', мне нужно будет изучить это. Не могли бы вы привести пример? – user2693053

2

согласно docsactive_count() включает в себя основной поток, так что если вы в 1, то вы, скорее всего, сделано, но если у вас есть еще один источник новых потоков в вашей программе, то вы можете сделать, прежде чем active_count() хиты 1.

Я рекомендую использовать явный join метод вашего ScraperPool и отслеживание ваших работников и явно соединив их в основной поток при необходимости вместо проверки того, что вы сделали с active_count() вызовов.

Также помните о GIL ...

+0

Это правда, что если вы на 1, все готово. Но если какой-либо другой модуль создает поток, вы никогда не получите 1. Это то, о чем спрашивает OP. – tdelaney

+0

@tdelaney Вот что я сказал ... – user3012759