2008-12-05 5 views
7

Я экспериментирую с новым модулем многопроцессорности в Python 2.6. Я создаю несколько процессов каждый со своим собственным многопроцессорным экземпляром .JoinableQueue. Каждый процесс генерирует один или несколько рабочих потоков (подклассы threading.Thread), которые совместно используют экземпляр JoinableQueue (передаются через метод __init__ каждого потока). Это, кажется, как правило, работают, но иногда и непредсказуемо терпит неудачу со следующей ошибкой:Python 2.6 multiprocessing.Queue совместим с потоками?

File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run 
    self.queue.task_done() 
    File "C:\Python26\lib\multiprocessing\queues.py", line 293, in task_done 
    raise ValueError('task_done() called too many times') 
ValueError: task_done() called too many times 

Мои очереди получить() и task_done() вызовы сразу после друг друга, так что они должны быть равны. Анекдотически это происходит, только когда работа, выполняемая между get() и task_done(), ОЧЕНЬ быстро. Вставка маленького time.sleep(0.01), похоже, облегчает проблему.

Любые идеи, что происходит? Могу ли я использовать многопроцессорную очередь с потоками вместо более традиционной (Queue.Queue)?

Спасибо!

-Брайан

+0

Вы можете использовать фрагмент кода, который включает объекты Queue. – jfs 2008-12-05 01:00:47

ответ

2

Вы должны передать очереди объектов в качестве аргументов цели.

Пример из multiprocessing's documentation:

from multiprocessing import Process, Queue 

def f(q): 
    q.put([42, None, 'hello']) 

if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    print q.get() # prints "[42, None, 'hello']" 
    p.join() 

Queues are thread and process safe.

-1

Спасибо за быстрый ответ. Я передаю экземпляры multiprocessing.Queue в качестве аргументов для каждого процесса, как вы это иллюстрируете. Ошибка возникает в потоках. Я создаю их путем подклассификации threading.Thread и передачи очереди в метод 'init' каждого экземпляра потока. Кажется, это приемлемый способ передать в Queues потоки подклассов. Я думал, что многопроцессорные очереди могут быть несовместимы с потоками (хотя они, предположительно, потокобезопасны).

4

Я еще не экспериментировал с многопроцессорной обработкой в ​​версии 2.6, но я много играл с pyprocessing (как его называли в 2.5).

Я вижу, что вы ищете несколько процессов, каждый из которых порождает множество потоков соответственно.

Поскольку вы используете модуль многопроцессорного, я предлагаю воспользоваться многоэкранным процессом, а не мульти нити подхода, вы ударите меньше проблем, как тупики и т.д.

Создать объект очереди. http://pyprocessing.berlios.de/doc/queue-objects.html

Для создания многопроцессорной среды используйте пул: http://pyprocessing.berlios.de/doc/pool-objects.html, который будет управлять рабочими процессами для вас. Затем вы можете применить асинхронную/синхронную работу с рабочими и также можете добавить обратный вызов для каждого работника, если это необходимо. Но помните, что обратный звонок является общим кодовым блоком и он должен немедленно возвращаться (как указано в документации)

Дополнительная информация: При необходимости создайте менеджера http://pyprocessing.berlios.de/doc/manager-objects.html для управления доступом к объекту очереди. Для этого вам придется сделать объект очереди общим. Но преимущество заключается в том, что после совместного использования и управления вы можете получить доступ к этой общей очереди по всей сети, создав прокси-объекты. Это позволит вам вызывать методы централизованного объекта общей очереди как (по-видимому) собственные методы на любом сетевом узле.

вот пример кода из документации

Можно запустить сервер диспетчера на одной машине, и есть клиенты используют его от других машин (при условии, что брандмауэры, участвующие позволяют это). Выполнение следующих команд создает сервер для общей очереди, которые удаленные клиенты могут использовать:

>>> from processing.managers import BaseManager, CreatorMethod 
>>> import Queue 
>>> queue = Queue.Queue() 
>>> class QueueManager(BaseManager): 
...  get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy') 
... 
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none') 
>>> m.serve_forever() 

Один клиент может получить доступ к серверу следующим образом:

>>> from processing.managers import BaseManager, CreatorMethod 
>>> class QueueManager(BaseManager): 
...  get_proxy = CreatorMethod(typeid='get_proxy') 
... 
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none') 
>>> queue = m.get_proxy() 
>>> queue.put('hello') 

Если вы настаиваете на безопасный резьбовая материал, PEP371 (многопроцессорная обработка) ссылки http://code.google.com/p/python-safethread/