2

Я использую простые поточные модули для выполнения параллельных заданий. Теперь я хотел бы воспользоваться преимуществами одновременных модулей фьючерсов. Может ли кто-нибудь поставить мне пример использования очереди с параллельной библиотекой?Как использовать очередь с параллельным будущим ThreadPoolExecutor в python 3?

Я получаю TypeError: объект «очередь» не итерацию я не знаю, как перебирать очередей

фрагмент кода:

def run(item): 
     self.__log.info(str(item)) 
     return True 
<queue filled here> 

with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor: 
     furtureIteams = { executor.submit(run, item): item for item in list(queue)} 
     for future in concurrent.futures.as_completed(furtureIteams): 
      f = furtureIteams[future] 
      print(f) 
+0

Обычно вы используете очередь для проблемы производителя-потребителя http://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem – User

+0

Я ищу пример кода для чтения очереди с помощью threadpoolexecutor – user2433024

ответ

3

Я хотел бы предложить что-то вроде этого:

def run(queue): 
     item = queue.get() 
     self.__log.info(str(item)) 
     return True 
<queue filled here> 
workerThreadsToStart = 10 
with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor: 
     furtureIteams = { executor.submit(run, queue): index for intex in range(workerThreadsToStart)} 
     for future in concurrent.futures.as_completed(furtureIteams): 
      f = furtureIteams[future] 
      print(f) 

Проблема, с которой вы столкнетесь, заключается в том, что очередь считается бесконечной и как среда для развязки потоков, которые помещают что-то в очередь и потоки t получить элементы из очереди.

Когда

  1. у вас есть конечное число элементов или
  2. вы вычислить все элементы сразу

, а затем обрабатывать их параллельно, очередь не имеет никакого смысла. ThreadPoolExecutor делает очереди в этих случаях устаревшими.

Я посмотрел на источник ThreadPoolExecutor:

def submit(self, fn, *args, **kwargs): # line 94 
    self._work_queue.put(w) # line 102 

очереди используется внутри.

+2

+1 здесь очередь может быть избыточной. В общем случае вы можете преобразовать очередь в итерабельную, используя функцию с двумя аргументами 'iter()': для элемента в iter (queue.get, sentinel): # получить элементы до тех пор, пока не будет найдена дознание ' – jfs