Следующий код показывает простой multiprocessing.Process трубопровода с общим словарем списки и очереди задач для различных потребителей:Python multiprocessing.Process ведет себя не детерминированной
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_dict):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_dict = result_dict
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print('%s: Exiting' % proc_name)
self.task_queue.task_done()
break
print('%s: %s' % (proc_name, next_task))
# Do something with the next_task
l = self.result_dict[5]
l.append(3)
self.result_dict[5] = l
# alternative, but same problem
#self.result_dict[5] += [3]
self.task_queue.task_done()
return
def provide_tasks(tasks, num_worker):
low = [
['w1', 'w2'],
['w3'],
['w4', 'w5']
]
for el in low:
tasks.put(el)
# Add a poison pill for each worker
for i in range(num_worker):
tasks.put(None)
if __name__ == '__main__':
num_worker = 3
tasks = multiprocessing.JoinableQueue()
manager = multiprocessing.Manager()
results = manager.dict()
lists = [manager.list() for i in range(1, 11)]
for i in range(1, 11):
results[i] = lists[i - 1]
worker = [Consumer(tasks, results) for i in range(num_worker)]
for w in worker:
w.start()
p = multiprocessing.Process(target=provide_tasks, args=(tasks,num_worker))
p.start()
# Wait for all of the tasks to finish
p.join()
print(results)
При запуске этот пример с Python3.x вы получите разные результаты для результатов dict. Я на самом деле ожидать, что результаты Сыроватского выглядеть
{1: [], 2: [], 3: [], 4: [], 5: [3, 3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}
Но для некоторых расстрелов это выглядит следующим образом:
{1: [], 2: [], 3: [], 4: [], 5: [3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}
Может кто-нибудь объяснить мне это поведение? Почему некоторые цифры отсутствуют?
ОБНОВЛЕНО подход решение в соответствии с предложенным ответом:
if next_task is None:
with lock:
self.result_dict.update(self.local_dict)
[...]
Где замок является manager.Lock()
и self.local_dict является defaultdict(list)
.
Перемещенный замок в соответствии с ответом на вопрос. Также добавлена версия, которая не работает с блокировкой.
# Works
with lock:
l = self.result_dict[x]
l.append(3)
self.result_dict[x] = l
self.task_queue.task_done()
# Doesn't work. Even if I move the lock out of the loop.
for x in range(1, 10):
with lock:
l = self.result_dict[x]
l.append(3)
self.result_dict[x] = l
Для того, чтобы получить второй пример, чтобы работать, мы должны вызвать join
по всем рабочим тоже.
Итак, я понимаю причину быть таким: «Сохранение прокси в проксируемого объект, а затем получить доступ к прокси-сервер возвращает копию самого объекта, а не хранимая прокси» * ([источник] (http://bugs.python.org/issue6766)) – Norman
Имеет смысл для меня. Довольно большая ловушка. Я обновил свой вопрос с помощью подхода к решению. Сохраняя значения в локальном словаре и обновляя общий dict только один раз с помощью общей блокировки. Однако такое же поведение, как и раньше. Я предположил, что блокировка блокирует ресурс для других процессов, и как только он освободит других, обновите их локальные результаты в общем общем словаре. Ты видишь проблему? У вас есть рабочее решение для решения проблемы? – user2715478
Откладывание обновления до появления ядовитой таблетки должно только усугубить это, потому что продолжительность гонки увеличивается. Попробуйте использовать блокировку, чтобы защитить те 3 строки, которые я упоминаю в своем ответе: 'with lock: get; присоединять; set'. Таким образом, они становятся атомной операцией. (В зависимости от того, как часто ваши работники это делают, это повлечет за собой некоторые накладные расходы на блокировку.) – Norman