4

Следующий код показывает простой multiprocessing.Process трубопровода с общим словарем списки и очереди задач для различных потребителей:Python multiprocessing.Process ведет себя не детерминированной

import multiprocessing 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_dict): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_dict = result_dict 

    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       # Poison pill means shutdown 
       print('%s: Exiting' % proc_name) 
       self.task_queue.task_done() 
       break 
      print('%s: %s' % (proc_name, next_task)) 

      # Do something with the next_task 
      l = self.result_dict[5] 
      l.append(3) 
      self.result_dict[5] = l 
      # alternative, but same problem 
      #self.result_dict[5] += [3] 

      self.task_queue.task_done() 
     return 

def provide_tasks(tasks, num_worker): 
    low = [ 
     ['w1', 'w2'], 
     ['w3'], 
     ['w4', 'w5'] 
    ] 
    for el in low: 
     tasks.put(el) 
    # Add a poison pill for each worker 
    for i in range(num_worker): 
     tasks.put(None) 

if __name__ == '__main__': 
    num_worker = 3 
    tasks = multiprocessing.JoinableQueue() 
    manager = multiprocessing.Manager() 

    results = manager.dict() 
    lists = [manager.list() for i in range(1, 11)] 
    for i in range(1, 11): 
     results[i] = lists[i - 1] 

    worker = [Consumer(tasks, results) for i in range(num_worker)] 
    for w in worker: 
     w.start() 

    p = multiprocessing.Process(target=provide_tasks, args=(tasks,num_worker)) 
    p.start() 
    # Wait for all of the tasks to finish 
    p.join() 
    print(results) 

При запуске этот пример с Python3.x вы получите разные результаты для результатов dict. Я на самом деле ожидать, что результаты Сыроватского выглядеть

{1: [], 2: [], 3: [], 4: [], 5: [3, 3, 3], 6: [], 7: [], 8: [], 9: [], 10: []} 

Но для некоторых расстрелов это выглядит следующим образом:

{1: [], 2: [], 3: [], 4: [], 5: [3, 3], 6: [], 7: [], 8: [], 9: [], 10: []} 

Может кто-нибудь объяснить мне это поведение? Почему некоторые цифры отсутствуют?

ОБНОВЛЕНО подход решение в соответствии с предложенным ответом:

if next_task is None: 
    with lock: 
     self.result_dict.update(self.local_dict) 
     [...] 

Где замок является manager.Lock() и self.local_dict является defaultdict(list).

Перемещенный замок в соответствии с ответом на вопрос. Также добавлена ​​версия, которая не работает с блокировкой.

# Works 
with lock: 
    l = self.result_dict[x] 
    l.append(3) 
    self.result_dict[x] = l 
self.task_queue.task_done() 

# Doesn't work. Even if I move the lock out of the loop. 
for x in range(1, 10): 
    with lock: 
     l = self.result_dict[x] 
     l.append(3) 
     self.result_dict[x] = l 

Для того, чтобы получить второй пример, чтобы работать, мы должны вызвать join по всем рабочим тоже.

ответ

3

Получение локальной копии списка, изменение его и переназначение его менеджеру dict не является атомной операцией, что создает условие гонки, при котором операция добавления может быть потеряна.

Описан в this python bug report.

l = self.result_dict[5] # <-- race begins 
l.append(3) 
self.result_dict[5] = l # <-- race ends 
+0

Итак, я понимаю причину быть таким: «Сохранение прокси в проксируемого объект, а затем получить доступ к прокси-сервер возвращает копию самого объекта, а не хранимая прокси» * ([источник] (http://bugs.python.org/issue6766)) – Norman

+0

Имеет смысл для меня. Довольно большая ловушка. Я обновил свой вопрос с помощью подхода к решению. Сохраняя значения в локальном словаре и обновляя общий dict только один раз с помощью общей блокировки. Однако такое же поведение, как и раньше. Я предположил, что блокировка блокирует ресурс для других процессов, и как только он освободит других, обновите их локальные результаты в общем общем словаре. Ты видишь проблему? У вас есть рабочее решение для решения проблемы? – user2715478

+0

Откладывание обновления до появления ядовитой таблетки должно только усугубить это, потому что продолжительность гонки увеличивается. Попробуйте использовать блокировку, чтобы защитить те 3 строки, которые я упоминаю в своем ответе: 'with lock: get; присоединять; set'. Таким образом, они становятся атомной операцией. (В зависимости от того, как часто ваши работники это делают, это повлечет за собой некоторые накладные расходы на блокировку.) – Norman