2

Рассмотрим следующий пример:Одновременно чтение Numpy массивы в параллельном

fine = np.random.uniform(0,100,10) 
fine[fine<20] = 0 # introduce some intermittency 
coarse = np.sum(fine.reshape(-1,2),axis=1) 

fine является таймсерия величин (например, объем осадков). coarse - это те же самые таймеры, но с разрешенным пополам разрешением, поэтому каждые 2 временных значения в fine агрегируются до одного значения в coarse.

Я тогда заинтересован в взвешивании, который определяет пропорцию величины coarse, что соответствует каждому в fine временного шага для случаев, когда значение coarse больше нуля.

def w_xx(fine, coarse): 
    weights = [] 
    for i, val in enumerate(coarse): 
     if val > 0: 
      w = fine[i*2:i*2+2]/val # returns both w1 and w2, w1 is 1st element, w2 = 1-w1 is second 
      weights.append(w) 
    return np.asarray(weights) 

Так w_xx(fine,coarse) будет возвращать массив формы 5,2 где элементы axis=1 являются веса fine для значения coarse.

Все это отлично подходит для небольших хранилищ, но я выполняю этот анализ на массивах размером ~ 60 тыс. От fine, плюс в цикле из 300 итераций.

Я пытаюсь выполнить этот запуск параллельно, используя библиотеку multiprocessing в Python2.7, но мне не удалось далеко продвинуться. Мне нужно одновременно считывать оба таймера, чтобы получить соответствующие значения fine для каждого значения в coarse, плюс работать только для значений выше 0, что и требует мой анализ.

Я был бы признателен за предложения по лучшему способу сделать это. Я предполагаю, что если я могу определить функцию сопоставления для использования с Pool.map в multiprocessing, я должен уметь распараллелить это? Я только начинал с multiprocessing, так что я не знаю, есть ли другой способ?

спасибо.

ответ

2

Вы можете достичь того же результата в векторизованного форме просто делать:

>>> (fine/np.repeat(coarse, 2)).reshape(-1, 2) 

, то вы можете отфильтровать строки, которые coarse равен нулю, с помощью np.isfinite, так как если coarse равен нулю выход либо inf или nan.

0

Отлично! Я не знал о np.repeat, спасибо вам большое.

Чтобы ответить на мой первоначальный вопрос в форме он был представлен, я тогда также удалось сделать эту работу с multiprocessing:

import numpy as np  
from multiprocessing import Pool 

fine = np.random.uniform(0,100,100000) 
fine[fine<20] = 0 
coarse = np.sum(fine.reshape(-1,2),axis=1) 

def wfunc(zipped): 
    return zipped[0]/zipped[1] 

def wpar(zipped, processes): 
    p = Pool(processes) 
    calc = np.asarray(p.map(wfunc, zip(fine,np.repeat(coarse,2)))) 

    p.close() 
    p.join() 

    return calc[np.isfinite(calc)].reshape(-1,2) 

Однако предложение от @ behzad.nouri, очевидно, лучше:

def w_opt(fine, coarse): 
    w = (fine/np.repeat(coarse, 2)) 
    return w[np.isfinite(w)].reshape(-1,2)  

#using some iPython magic 
%timeit w_opt(fine,coarse) 
1000 loops, best of 3: 1.88 ms per loop 

%timeit w_xx(fine,coarse) 
1 loops, best of 3: 342 ms per loop 

%timeit wpar(zip(fine,np.repeat(coarse,2)),6) #I've 6 cores at my disposal 
1 loops, best of 3: 1.76 s per loop 

Еще раз спасибо!

1

В дополнении к Numpy выражению, предложенному @ behzad.nouri, вы можете использовать компилятор pythran сорвать дополнительные ускорения:

$ cat w_xx.py 
#pythran export w_xx(float[], float[]) 
import numpy as np 

def w_xx(fine, coarse): 
    w = (fine/np.repeat(coarse, 2)) 
    return w[np.isfinite(w)].reshape(-1,2) 
$ python -m timeit -s 'import numpy as np; fine = np.random.uniform(0,100,100000); fine[fine<20] = 0; coarse = np.sum(fine.reshape(-1,2),axis=1); from w_xx import w_xx' 'w_xx(fine, coarse)' 
1000 loops, best of 3: 1.5 msec per loop 
$ pythran w_xx.py -fopenmp -march=native # yes, this generates parallel code 
$ python -m timeit -s 'import numpy as np; fine = np.random.uniform(0,100,100000); fine[fine<20] = 0; coarse = np.sum(fine.reshape(-1,2),axis=1); from w_xx import w_xx' 'w_xx(fine, coarse)' 
1000 loops, best of 3: 867 usec per loop 

отказ от ответственности: Я действительно один из pythran разработчика.