2016-07-05 15 views
3

Я пытаюсь перенести часть моего R-кода на Джулию; В основном я переписан следующий R код Julia:Julia pmap performance

library(parallel) 

eps_1<-rnorm(1000000) 
eps_2<-rnorm(1000000) 

large_matrix<-ifelse(cbind(eps_1,eps_2)>0,1,0) 
matrix_to_compare = expand.grid(c(0,1),c(0,1)) 
indices<-seq(1,1000000,4) 
large_matrix<-lapply(indices,function(i)(large_matrix[i:(i+3),])) 

function_compare<-function(x){ 
    which((rowSums(x==matrix_to_compare)==2) %in% TRUE) 
} 

> system.time(lapply(large_matrix,function_compare)) 
    user system elapsed 
38.812 0.024 38.828 
> system.time(mclapply(large_matrix,function_compare,mc.cores=11)) 
    user system elapsed 
63.128 1.648 6.108 

Как можно заметить, я получаю значительную скорость вверх при переходе от одного ядра до 11. Теперь я пытаюсь сделать то же самое в Джулии:

#Define cluster: 

addprocs(11); 

using Distributions; 
@everywhere using Iterators; 
d = Normal(); 

eps_1 = rand(d,1000000); 
eps_2 = rand(d,1000000); 


#Create a large matrix: 
large_matrix = hcat(eps_1,eps_2).>=0; 
indices = collect(1:4:1000000) 

#Split large matrix: 
large_matrix = [large_matrix[i:(i+3),:] for i in indices]; 

#Define the function to apply: 
@everywhere function function_split(x) 
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4))); 
    matrix_to_compare = matrix_to_compare.>0; 
    find(sum(x.==matrix_to_compare,2).==2) 
end 

@time map(function_split,large_matrix) 
@time pmap(function_split,large_matrix) 

    5.167820 seconds (22.00 M allocations: 2.899 GB, 12.83% gc time) 
    18.569198 seconds (40.34 M allocations: 2.082 GB, 5.71% gc time) 

Как видно, я не получаю ускорения с pmap. Может быть, кто-то может предложить альтернативы.

+1

'big_matrix' is' 250000-element Array {Any, 1}: 'Возможно, это проблема? – daycaster

+0

Я действительно не знаю, что я очень новичок в Julia – Vitalijs

+0

On Julia 0.4.6 Я получаю следующие результаты с 'addprocs (3)': '4.173674 секунд (22.97 M ассигнований: 2.943 GB, 14.57% gc time)' и '0.795733 секунды (292.07 k распределений: 12.377 MB, 0.83% gc time)'. Также тип 'large_matrix' -' Array {BitArray {2}, 1} '. – tim

ответ

1

Я думаю, что некоторые проблемы здесь состоят в том, что @parallel и @pmap не всегда обрабатывают данные о перемещениях и от рабочих очень хорошо. Таким образом, они, как правило, лучше всего работают в ситуациях, когда то, что вы выполняете, не требует очень большого перемещения данных вообще. Я также подозреваю, что есть что-то, что можно было бы сделать, чтобы улучшить их производительность, но я не уверен в деталях.

Для ситуаций, в которых вам нужно больше данных перемещаться, лучше всего использовать опции, которые непосредственно вызывают функции для рабочих, причем эти функции затем обращаются к объектам в пространстве памяти этих рабочих. Ниже приведен один пример, который ускоряет вашу работу с помощью нескольких сотрудников. Он использует, пожалуй, самый простой вариант: @everywhere, но @spawn, remotecall() и т. Д. Также стоит учитывать, в зависимости от вашей ситуации.

addprocs(11); 

using Distributions; 
@everywhere using Iterators; 
d = Normal(); 

eps_1 = rand(d,1000000); 
eps_2 = rand(d,1000000); 

#Create a large matrix: 
large_matrix = hcat(eps_1,eps_2).>=0; 
indices = collect(1:4:1000000); 

#Split large matrix: 
large_matrix = [large_matrix[i:(i+3),:] for i in indices]; 

large_matrix = convert(Array{BitArray}, large_matrix); 

function sendto(p::Int; args...) 
    for (nm, val) in args 
     @spawnat(p, eval(Main, Expr(:(=), nm, val))) 
    end 
end 

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm))) 

@everywhere function function_split(x::BitArray) 
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4))); 
    matrix_to_compare = matrix_to_compare.>0; 
    find(sum(x.==matrix_to_compare,2).==2) 
end 


function distribute_data(X::Array, WorkerName::Symbol) 
    size_per_worker = floor(Int,size(X,1)/nworkers()) 
    StartIdx = 1 
    EndIdx = size_per_worker 
    for (idx, pid) in enumerate(workers()) 
     if idx == nworkers() 
      EndIdx = size(X,1) 
     end 
     @spawnat(pid, eval(Main, Expr(:(=), WorkerName, X[StartIdx:EndIdx]))) 
     StartIdx = EndIdx + 1 
     EndIdx = EndIdx + size_per_worker - 1 
    end 
end 

distribute_data(large_matrix, :large_matrix) 


function parallel_split() 
    @everywhere begin 
     if myid() != 1 
      result = map(function_split,large_matrix); 
     end 
    end 
    results = cell(nworkers()) 
    for (idx, pid) in enumerate(workers()) 
     results[idx] = getfrom(pid, :result) 
    end 
    vcat(results...) 
end 

## results given after running once to compile 
@time a = map(function_split,large_matrix); ## 6.499737 seconds (22.00 M allocations: 2.899 GB, 13.99% gc time) 
@time b = parallel_split(); ## 1.097586 seconds (1.50 M allocations: 64.508 MB, 3.28% gc time) 

julia> a == b 
true 

Примечание: даже при этом ускорение не является совершенным из нескольких процессов. Но этого следует ожидать, поскольку в результате вашей функции все еще требуется умеренное количество данных, и данные должны быть перемещены, принимая время.

P.S. См. Этот пост (Julia: How to copy data to another processor in Julia) или этот пакет (https://github.com/ChrisRackauckas/ParallelDataTransfer.jl) для получения дополнительной информации о функциях sendto и getfrom, которые я использовал здесь.