2016-06-24 4 views
1

У меня возникла проблема использования многопроцессорности в Python, чтобы рассчитать результаты LCA для всех функциональных блоков в базе данных ecoinvent v3.2 для нескольких итераций.Почему работники умирают от проблем с памятью при использовании многопроцессорности в Python для выполнения LCA-вычислений?

Код следующее:

for worker_id in range(CPUS): 

    # Create child processes that can work apart from parent process 
    child = mp.Process(target=worker_process, args=(projects.current, output_dir, worker_id, activities, ITERATIONS, status)) 
    workers.append(child) 
    child.start() 
print(workers) 

while any(i.is_alive() for i in workers): 
    time.sleep(0.1) 
    while not status.empty(): 
     # Flush queue of progress reports 
     worker, completed = status.get() 
     progress[worker] = completed 
    progbar.update(sum(progress.values())) 
progbar.finish() 

Определение функции worker_process следующим образом:

def worker_process(project, output_dir, worker_id, activities, iterations, progress_queue): 

# Project is string; project name in Brightway2 
# output_dir is a string 
# worker_id is an integer 
# activities is a list of dictionaries 
# iterations is an integer 
# progress_queue is a Queue where we can report progress to parent process 

projects.set_current(project, writable=False) 

lca = DirectSolvingPVLCA(activities[0]) 
lca.load_data() 
samples = np.empty((iterations, lca.params.shape[0])) 
supply_arrays = np.empty((iterations, len(activities), len(lca.product_dict))) 

for index in range(iterations): 

    lca.rebuild_all() 
    samples[index, :] = lca.sample 
    lca.decompose_technosphere() 
    for act_index, fu in enumerate(activities): 
     lca.build_demand_array(fu) 
     supply_arrays[index, act_index, :] = lca.solve_linear_system() 
    progress_queue.put((worker_id, index)) 

Наблюдаемые проблемы:

  1. В течение более двух рабочих, все за исключением двух, сразу же от MemoryError (см. ниже).

  2. Для двух оставшихся в живых работников код работает нормально для 10, 100 или 5000 функциональных блоков, но по мере того как мы просим всех FU, он ломается и работает в том же MemoryError.

Это MemoryError происходит для каждого процесса X:

Process Process-X: 
    Traceback (most recent call last): 
     File "C:\bw2-python\envs\bw2\lib\multiprocessing\process.py", line 254, in_bootstrap 
     self.run()  
     File "C:\bw2-python\envs\bw2\lib\multiprocessing\process.py", line 93, in run 
     self._target(*self._args, **self._kwargs) 
     File "C:\test\Do all the calculations.py", line 49, in worker_process 
     supply_arrays = np.empty((iterations, len(activities), len(lca.product_dict))) 
    MemoryError 

Мои вопросы:

  • Почему это происходит?

  • Как это можно исправить?

ответ

1

У вас заканчивается память, потому что вы используете слишком много памяти.

При выделении нового массива с использованием:

np.empty((iterations, len(activities), len(lca.product_dict))) 

И activities и lca.product_dict каждый имеет длину, скажем, 10000, вы используете 10,000 * 10,000 * 8 (предполагается, что по умолчанию поплавок 64 бита, или 8 байтов) = 800 МБ оперативной памяти на итерацию и на рабочий процесс.

Простым решением является работа на сервере с большим количеством ОЗУ.

Альтернативы создания этих больших массивов в памяти включают в себя:

В любом случае вам необходимо будет протестировать наиболее эффективные способы записи и чтения данных для конкретного рабочего процесса и ОС.

+0

Благодарим вас за ответ. Кажется, что преобразование данных в полуточность поплавка в числовых файлах работало! Но я продолжаю изучать, что является лучшим вариантом для оптимизации времени и памяти. – WhiteLamb