2013-04-03 1 views
2

С помощью некоторых членов здесь я установил код, который работает в Python, и оценивает функцию, которая принимает два огромных значения np.arrays.Параллелизация цикла в cython

векторизованная версия работает параллельно еще массово отнимает много времени и фактор ~ 50 медленнее, чем эталонная программа, написанной в серийном фортраном ...

Я хотел бы использовать цикл Cython вместо, для которого я могу использовать OpenMP или MPI для параллелизации. Идея в C++ будет, как:

#pragma omp parallel for 
for (i=0;i<np1;i++){ 
    for (i=0;i<np2;i++){ 
    double dist = sph(coord1_particle1,coord1_particle2,coord2_particle1,coord2_particle2) 
    int bin=binning_function(dist) 
    hist_array[bin]++ 
    } 
} 

Любые идеи полностью приветствуются. Вот версия Python:

#a is an array containing two coordinates of two objects 
def dist_vec(a): # a like [[array1,array2,array2,array2],[],[]...] 
    return sph(a[0],a[1],a[2],a[3]) # sph operates on coordinates 

def vec_chunk(array_ab, bins) : 
    dist = dist_vec(array_ab) 
    hist, _ = np.histogram(dist, bins=bins) 
    return hist 


def mp_dist(array_a,array_b, d, bins): #d chunks AND processes 
    def worker(array_ab, out_q): 
     """ push result in queue """ 
     outdict = vec_chunk(array_ab, bins) 
     out_q.put(outdict) 
    # Each process will get 'chunksize' nums and a queue to put his out 
    out_q = mp.Queue() 
    a = np.swapaxes(array_a, 0 ,1) 
    b = np.swapaxes(array_b, 0 ,1) 
    array_size_a=len(array_a)-(len(array_a)%d) 
    array_size_b=len(array_b)-(len(array_b)%d) 
    a_chunk = array_size_a/d 
    b_chunk = array_size_b/d 
    procs = [] 
    '''prepare arrays for mp''' 
    array_ab = np.empty((4, a_chunk, b_chunk)) 
    for j in xrange(d): 
     for k in xrange(d): 
     array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None] 
     array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)] 
     p= mp.Process(target=worker, args=(array_ab, out_q)) 
     p.start() 
     procs.append(p) 
    for pro in procs: 
     pro.join() 
    # Collect all results into a single result dict. 
    resultarray = np.empty(len(bins)-1) 
    for i in range(d): 
     resultarray+=out_q.get() 
     #resultdict.update(out_q.get()) 
    return resultarray 

bins = np.logspace(-3,1, num=25) #prepare x-axis for histogram 
start_time = time() 
hist_data = mp_dist(DATA,sim,10,bins) 
print 'Total Time Elaspsed: ', time() - start_time 

ответ

4

Следующий код ~ 6 раз быстрее, чем оригинал: Он использует быстрее гистограммирование код из http://code.google.com/p/astrolibpy/source/browse/my_utils/quick_hist.py (потому что np.histogram слишком медленно для бункеров одинаковой длины) The новый код не создает много процессов, использует multiprocessing.pool, а также избегает обширного копирования данных между процессами.

Остальная часть производительности может быть получена путем перезаписи функции расстояния в цитоне. Или еще лучше, переписывание dist_vec() в Cython или scipy.weave (смотрите пример в коде quick_hist)

import numpy as np,multiprocessing as mp 
from time import time 
import quick_hist 
def sph(a, b, c, d): 
    return numexpr.evaluate('log(((a - c)**2 + (b - d)**2)**.5)') 

def dist_vec(a,b): 
    return sph(a[:,0][:, None], a[:,1][:, None], b[:,0][None, :], b[:,1][None, :]) 

def vec_chunk(a, b, bins) : 
    dist = dist_vec(a, b).flatten() 
    hist = quick_hist.quick_hist((dist,), [(bins[0], bins[-1])], [len(bins)]) 
    return hist 

class si: 
    # singleton to share read-only data between processes 
    a = None 
    b = None 
    step = None 
    bins = None 

def func(l1): 
    return vec_chunk(si.a[l1:l1+si.step,:], si.b, si.bins) 

def mp_dist(array_a,array_b, d, bins): #d chunks 
    nproc = 8 # n processes 
    si.a = array_a 
    si.b = array_b 
    si.step = d 
    si.bins = bins 
    nx = array_a.shape[0] 
    lefts = np.arange(0, nx, d) #left edges of the chunks 
    pool = mp.Pool(nproc) 
    results = pool.map(func, lefts) 
    results = np.array(results).sum(axis=0) 
    pool.close() 
    pool.join() 
    return results 

if __name__=='__main__': 
    bins = np.logspace(-3,1, num=25) #prepare x-axis for histogram 
    start_time = time() 
    n1 = 10000 
    n2 = 10000 
    DATA = np.random.uniform(size=(n1, 2)) 
    sim = np.random.uniform(size=(n2, 2)) 
    chunksize = 10 
    hist_data = mp_dist(DATA, sim, chunksize, bins) 

    print 'Total Time Elaspsed: ', time() - start_time 
+0

спасибо, ваша версия даже ~ 12 раз быстрее. У меня все еще есть вопросы. Могу ли я реализовать функцию, написанную на cython? Я подумывал написать эту функцию в C-Code. Можете ли вы объяснить мне, что делает один сингл? – madzone

+0

Я думаю, что люди обычно используют cython, чтобы фактически уйти от векторизации и скорее закодировать циклы. AFAIK это обычно быстрее в cython (хотя я не пишу много кода на языке cython). Что касается синглтона - это, по сути, структура или класс, содержащие переменные, но здесь главное, что он будет виден в дочерних процессах, созданных mp.Pool(). И память даже не нужно копировать. –

+0

Вот почему мне было любопытно спросить. Если я вычисляю функцию 'sph' в цикле (скажем, для каждой комбинации), то вектор-векторизация' dist_vec' бесполезна, не так ли? – madzone