2016-02-23 12 views
1

Я пытаюсь создать простой шаблон производителя/потребителя в Python, используя multiprocessing. Он работает, но он висит на poll.join(). ВыходMultiprocesing pool.join() зависает при некоторых обстоятельствах

from multiprocessing import Pool, Queue 

que = Queue() 


def consume(): 
    while True: 
     element = que.get() 
     if element is None: 
      print('break') 
      break 
    print('Consumer closing') 


def produce(nr): 
    que.put([nr] * 1000000) 
    print('Producer {} closing'.format(nr)) 


def main(): 
    p = Pool(5) 
    p.apply_async(consume) 
    p.map(produce, range(5)) 
    que.put(None) 
    print('None') 
    p.close() 
    p.join() 


if __name__ == '__main__': 
    main() 

Пример:

~/Python/Examples $ ./multip_prod_cons.py 
Producer 1 closing 
Producer 3 closing 
Producer 0 closing 
Producer 2 closing 
Producer 4 closing 
None 
break 
Consumer closing 

Однако, он отлично работает, когда я изменить одну строку:

que.put([nr] * 100) 

Это 100% воспроизводимые на системе Linux работает Python 3.4.3 или Python 2.7.10. Я что-то упускаю?

ответ

1

Здесь довольно много путаницы. То, что вы пишете, не сценарий производителя/потребителя, а беспорядок, который неправильно использует другой шаблон, обычно называемый «пул работников».

Пул рабочих шаблонов - это приложение производителя/потребителя, в котором есть один производитель, который планирует работу и многие потребители, которые ее потребляют. В этом образце владелец Pool оказался продюсером, в то время как рабочие будут потребителями.

В вашем примере вместо этого у вас есть гибридное решение, в котором один рабочий становится потребителем, а другие действуют как нечто среднее. Вся конструкция очень неэффективна, дублирует большую часть логики, уже предоставленной Pool и, что более важно, очень подвержена ошибкам. То, от чего вы в конечном итоге страдаете, - это Deadlock.

Ввод объекта в multiprocessing.Queue является асинхронной. Он блокируется, только если Queue заполнен, а ваш Queue имеет бесконечный размер.

Это означает, что ваша функция produce немедленно возвращается, поэтому звонок p.map не блокирует вас, как вы ожидаете. Соответствующий рабочий процесс вместо этого ожидает, пока фактическое сообщение не пройдет через Pipe, который Queue использует в качестве канала связи.

Что происходит дальше, что вы преждевременно прекратить ваш потребитель, как вы выразились в Queue «сообщение» None который доставляется перед всеми списками вашей produce функции Создаваемой правильно протолкнула Pipe.

Вы замечаете проблему, как только вы звоните p.join, но реальная ситуация следующая.

  • p.join звонок ожидает завершения всех рабочих процессов.
  • рабочие процессы ждут, когда большие списки пройдут, если QueuePipe.
  • как потребительский рабочий давно ушел, никто не истощает Pipe, который явно заполнен.

Проблема не показывает, достаточно ли достаточно, чтобы ваши списки были достаточно малы, чтобы отправить сообщение о завершении на функцию consume.

+0

Большое спасибо за ответ. Я согласен с большинством пунктов здесь, но мне любопытно. Я реальное приложение, производители делают много вычислительно интенсивной работы, прежде чем результат будет перенесен в очередь, а потребитель сохранит результаты в базе данных. Можете ли вы объяснить немного больше: «Весь дизайн очень неэффективен, дублирует большую часть логики, уже предоставленной пулом»? – Fenikso

+0

«Пул» уже создает внутреннюю «очередь», которая использует для отправки Заданий Рабочим. Рабочие - это просто процессы, ожидающие во внутренней «очереди» и вызывающие данную функцию, когда запланирована новая задача. В вашем примере вы используете еще одну «Очередь», которую контролирует один из Рабочих, чтобы отправить задание другому. Вы дважды выполняете всю работу :) – noxdafox

+0

Правильный дизайн позволил бы вашим Работникам выполнять интенсивно вычислительную работу и возвратите результаты вызывающему, который затем сохранит результаты в БД. Ваш родительский процесс будет действовать как диспетчер планировщика и результатов. В случае, если действие хранилища БД вызывает беспокойство из-за тяжелых операций ввода-вывода, вы можете передать результаты в «Пул» потоков, позволяя масштабировать как по аспектам ЦП, так и по вводу/выводу вашего обслуживания. – noxdafox