2017-01-12 7 views
3

Я пытаюсь реализовать многопроцессорность для этого цикла. Он не может изменить массив или, похоже, не правильно упорядочивает задания (возвращает массив до выполнения последней функции).Многопроцессорный цикл функции, которая записывает в массив в python

import multiprocessing 
import numpy 


def func(i, array): 
    array[i] = i**2 
    print(i**2) 

def main(n): 
    array = numpy.zeros(n) 

    if __name__ == '__main__': 
     jobs = [] 
     for i in range(0, n): 
      p = multiprocessing.Process(target=func, args=(i, array)) 
      jobs.append(p) 
      p.start() 

    return array 

print(main(10)) 
+0

От [this post] (http://stackoverflow.com/a/15858559/1636276): «Проблема в том, что когда объекты передаются рабочим процессам, они упаковываются с рассолом, отгружаются в другой процесс, где они распаковываются и обрабатываются. Ваши объекты не столько передаются другому процессу, сколько клонированы ». – Tagc

ответ

3

Процессы не делятся памятью, ваша программа изначально создаст массив, полный нулей, а затем запустит 10 процессов, которые вызовут функцию func на копии массива, когда он был впервые создан, но никогда не будет оригинального массива.

Похоже, что вы на самом деле пытаетесь достичь это:

from multiprocessing import Process, Lock 
from multiprocessing.sharedctypes import Array 


def modify_array(index, sharedarray): 
    sharedarray[index] = index ** 2 
    print([x for x in sharedarray]) 


def main(n): 
    lock = Lock() 
    array = Array('i', 10, lock=lock) 
    if __name__ == '__main__': 
     for i in range(0, n): 
      p = Process(target=modify_array, args=(i, array)) 
      p.start() 
      p.join() 
    return list(array) 

main(10) 

Выход:

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0] 
[0, 1, 4, 0, 0, 0, 0, 0, 0, 0] 
[0, 1, 4, 9, 0, 0, 0, 0, 0, 0] 
[0, 1, 4, 9, 16, 0, 0, 0, 0, 0] 
[0, 1, 4, 9, 16, 25, 0, 0, 0, 0] 
[0, 1, 4, 9, 16, 25, 36, 0, 0, 0] 
[0, 1, 4, 9, 16, 25, 36, 49, 0, 0] 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 0] 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 

Но проблема в том, используя мультипроцессирование ошибочна. Существует множество накладных расходов на создание дополнительного процесса, по сравнению с новым потоком или даже просто однопоточность и использование цикла событий для запуска действий.

Пример использования параллелизма в однопоточном, едином процессе Python может выглядеть следующим образом:

import numpy as np 
from asyncio import get_event_loop, wait, ensure_future 


def modify_array(index, array): 
    array[index] = index ** 2 
    print([x for x in array]) 


async def task(loop, function, index, array): 
    await loop.run_in_executor(None, function, index, array) 


def main(n): 
    loop = get_event_loop() 
    jobs = list() 
    array = np.zeros(10) 
    for i in range(0, n): 
     jobs.append(
      ensure_future(
       task(loop, modify_array, i, array) 
      ) 
     ) 
    loop.run_until_complete(wait(jobs)) 
    loop.close() 

main(10) 

Это популярный шаблон в эти дни, использование asyncio события петель для выполнения задач в параллельны друг другу.Однако, поскольку вы используете библиотеку, такую ​​как Numpy, я задаю вопрос, насколько ценен этот шаблон для вас.

+0

Спасибо за ваш ответ. У меня есть доступ к 30-ядерному компьютеру, и моя настоящая функция содержит интеграл, который занимает около 2 секунд (выполняется тысячи раз). В этом случае накладные расходы будут стоить того или есть еще лучший способ? – Tom

+0

Если задачи запускают процессы во время запуска, которые долгое время работают, тогда накладные расходы могут не быть проблемой, но если вы часто нерестные процессы, я бы сказал, что это не стоит. –

+0

Ваша программа также выдает сообщение об ошибке RuntimeError ('Контур события закрыт') RuntimeError: цикл событий закрыт – Tom

3

Я не использовал multiprocessing раньше, так что я новичок в этом тоже, но после того, как делать небольшое исследование (в основном из thesetwo сообщений), я думаю, что я частично удалось решить проблему с этим код:

import multiprocessing 
import numpy 


def func(i, array, connection): 
    squared_value = i ** 2 
    array[i] = squared_value 
    print(squared_value) 

    connection.send(array) 


def main(n): 
    array = numpy.zeros(n) 

    for i in range(0, n): 
     recv_end, send_end = multiprocessing.Pipe(False) 
     p = multiprocessing.Process(target=func, args=(i, array, send_end)) 
     p.start() 
     p.join() 
     array = recv_end.recv() 

    return array 


if __name__ == '__main__': 
    print(main(10)) 

Выход

0 
1 
4 
9 
16 
25 
36 
49 
64 
81 
[ 0. 1. 4. 9. 16. 25. 36. 49. 64. 81.] 

причина, почему этот подход изменяет ар лучи и ваш не объясняются в this answer that I referenced in the comments:

The problem is that when the objects are passed to the worker processes, they are packed up with pickle, shipped to the other process, where they are unpacked and worked on. Your objects aren't so much passed to the other process, as cloned. You don't return the objects, so the cloned object are happily modified, and then thrown away.

Есть несколько вещей, которые я должен отметить мое (частично) решения:

  • Этой реализация работает намного медленнее, чем просто создавая этот список обычным способом (через один поток). Это, скорее всего, связано с дополнительными накладными расходами на создание новых процессов и сортировку данных между ними.

  • Из-за характера вашей проблемы (с заданиями, которые каждый изменяет массив), каждое задание обязательно должно принимать в качестве входных данных результат предыдущего задания. Из-за этого ограничения я не считаю, что возможно, что задания выполняются одновременно, что приводит к поражению точки многопроцессорности.

Что касается последнего из этих двух точек, я попробовать вариант, где func возвращает функцию, которая принимает массив и возвращает модифицированную версию. Это позволило бы выполнять задания одновременно, но, к сожалению, это не похоже на то, что функции можно мариновать.

+0

Престижность для альтернативы моему ответу. Побей меня. Я делал те же наблюдения относительно скорости. –

+0

@VasiliSyrakis Спасибо. Приятно знать, что «многопроцессорность» определяет типы, которые могут использоваться совместно между процессами. +1 за это – Tagc

+0

Спасибо за ваш ответ. Я тестирую ваш ответ с добавлением цикла к функции для увеличения времени обработки. (sum = 0 // для j в диапазоне (0, 10000000): // sum = sum + j). Он по-прежнему выполняет функции последовательно в одном ядре - мне нужно сделать что-то еще, чтобы заставить его запускать процесс на каждом ядре процессора? – Tom