2013-09-29 2 views
15

У меня есть большой код, и шаг агрегации является текущим узким местом с точки зрения скорости.Ускоренная группа данных. Таблица с использованием нескольких ядер и параллельного программирования

В моем коде я хотел бы ускорить шаг группировки данных, чтобы быть быстрее. SNOTE (простой не тривиальный пример) мои данные выглядит следующим образом:

library(data.table) 
a = sample(1:10000000, 50000000, replace = TRUE) 
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE) 
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE) 
e = a 
dt = data.table(a = a, b = b, d = d, e = e) 
system.time(c.dt <- dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)]) 
    user system elapsed 
60.107 3.143 63.534 

Это довольно быстро для такого примера больших данных, но в моем случае я все еще ищу для дальнейшего ускорения. В моем случае у меня есть несколько ядер, поэтому я почти уверен, что должен быть способ использовать такие вычислительные возможности.

Я открыт для изменения моего типа данных на объекты data.frame или idata.frame (теоретически idata.frame предположительно быстрее, чем data.frames).

Я провел некоторое исследование и, похоже, пакет plyr имеет некоторые параллельные возможности, которые могут быть полезны, но я все еще борется за то, как это сделать для группировки, которую я пытаюсь сделать. В another SO post they discuss some of these ideas. Я все еще не уверен в том, насколько я достиг этого с помощью этой распараллеливания, поскольку он использует функцию foreach. По моему опыту, foreach function не является хорошей идеей для миллионов быстрых операций, потому что усилие связи между ядрами приводит к замедлению усилий по параллелизации.

+0

Просьба уточнить, что означают слова «concatenate» и «aggregate». Функции, которые они приходят на ум, - это 3: 'list',' c' и 'paste'. И какова функция этого кода. Вынимаем ли столбцы из фреймов данных или работаем над data.tables? Каковы структуры «block.read.parent.cigar» и других входных переменных ..... лучше объясните эту проблему! (По-видимому, кто-то другой согласен. Это не мой downvote.) –

+0

@Dwin, спасибо! Я не уверен, достаточно ли я уточнил в Q, но основной вопрос - как ускорить операцию агрегации для большой таблицы данных, подобной той, что приведена в примере выше. Также иметь в виду, что я могу использовать несколько ядер, поэтому могут быть некоторые идеи умной паралелинизации, которые могут значительно ускорить такую ​​операцию. Надеюсь, что это поможет, я добавил пример – Dnaiel

+0

Я тоже не занимаюсь ничем, но похоже, что хранение ваших данных таким образом (с символьными векторами), как правило, будет медленным, и объединение их будет только замедлять вас (если вы не экспорт для использования в другом программном обеспечении), так как вам придется разбить строки снова и снова для анализа. Вероятно, вы должны использовать специализированный пакет для сигар ... Я ничего не знаю об этом, но вы уже были направлены к одному в более раннем вопросе ... http: //stackoverflow.com/q/18969698/1191259 – Frank

ответ

8

Если у вас есть несколько ядер, доступных для вас, почему бы не использовать тот факт, что вы можете быстро фильтровать & группы строк в data.table, используя свой ключ:

library(doMC) 
registerDoMC(cores=4) 


setkey(dt, "a") 

finalRowOrderMatters = FALSE # FALSE can be faster 
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar% 
    dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])] 

Обратите внимание, что если числа уникальных групп (т. е. length(unique(a))) относительно невелик, будет быстрее отказаться от аргумента .combine, вернуть результаты в список, а затем вызвать rbindlist по результатам. При тестировании на двух ядрах & 8 ГБ ОЗУ порог был около 9000 уникальных значений. Вот то, что я использовал для сравнения:

# (otion a) 
round(rowMeans(replicate(3, system.time({ 
# ------- # 
    foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar% 
    dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])] 
# ------- # 
}))), 3) 
# [1] 1.243 elapsed for N == 1,000 
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617 
# [1] 57.404 elapsed for N == 50,000 



# (otion b) 
round(rowMeans(replicate(3, system.time({ 
# ------- # 
    results <- 
     foreach(x=unique(dt[["a"]])) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])] 
    rbindlist(results) 
# ------- # 
}))), 3) 
# [1] 1.117 elapsed for N == 1,000 
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617 
# [1] 76.613 elapsed for N == 50,000 


## And used the following to create the dt 
N <- 5e4 
set.seed(1) 
a = sample(1:N, N*2, replace = TRUE) 
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE) 
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE) 
e = a 
dt = data.table(a = a, b = b, d = d, e = e, key="a") 
+4

Требуется ли для каждого подпроцесса копирование по полной таблице данных или все они имеют доступ к «основному» объекту data.table? – Zach

12

Можете ли вы Распараллеливать агрегацию с data.table? Да.

Стоило ли? NO. Это ключевой момент, который не удалось выделить в предыдущем ответе.

В качестве Matt Dowle объясняется в data.table and parallel computing, копии («куски») должны быть выполнены перед распределением при параллельной работе. Это замедляет работу. В некоторых случаях, когда вы не может использовать data.table (например, работает много линейных регрессий), стоит разделить задачи между ядрами. Но не агрегация - по крайней мере, когда участвует data.table.

Короче говоря (и до тех пор, пока не доказано обратное), агрегат с помощью data.table и перестать беспокоиться о потенциальных увеличении скорости с использованием doMC. data.table уже блистает по сравнению с чем-либо еще доступным, когда дело доходит до агрегации - даже если это не многоядерность!


Вот некоторые тесты вы можете запустить для себя сравнение data.table внутренней агрегации с помощью by с foreach и mclapply. Результаты перечислены первыми.

#----------------------------------------------- 

# TL;DR FINAL RESULTS (Best to Worst) 
# 3 replications, N = 10000: 
# (1) 0.007 -- data.table using `by` 
# (2) 3.548 -- mclapply with rbindlist 
# (3) 5.557 -- foreach with rbindlist 
# (4) 5.959 -- foreach with .combine = "rbind" 
# (5) 14.029 -- lapply 

# ---------------------------------------------- 

library(data.table) 

## And used the following to create the dt 
N <- 1e4 
set.seed(1) 
a = sample(1:N, N*2, replace = TRUE) 
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE) 
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE) 
e = a 
dt = data.table(a = a, b = b, d = d, e = e, key="a") 
setkey(dt, "a") 

# TEST AGGREGATION WITHOUT PARALLELIZATION --------------------------- 
## using data.tables `by` to aggregate 
round(rowMeans(replicate(3, system.time({ 
    dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)] 
}))), 3) 
# [1] 0.007 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617 

## using `lapply` 
round(rowMeans(replicate(3, system.time({ 
    results <- lapply(unique(dt[["a"]]), function(x) { 
     dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1])] 
    }) 
    rbindlist(results) 
}))), 3) 
# [1] 14.029 elapsed for N == 10,000 

# USING `mclapply` FORKING --------------------------------- 
## use mclapply 
round(rowMeans(replicate(3, system.time({ 
    results <- mclapply(unique(dt[["a"]]), 
    function(x) { 
     dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])] 
    }, mc.cores=4) 
    rbindlist(results) 
}))), 3) 
# [1] 3.548 elapsed for N == 10,000 


# PARALLELIZATION USING `doMC` PACKAGE --------------------------------- 
library(doMC) 
mc = 4 
registerDoMC(cores=mc) 
getDoParWorkers() 
# [1] 4 

## (option a) by Ricardo Saporta 
round(rowMeans(replicate(3, system.time({ 
    foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar% 
    dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])] 
}))), 3) 
# [1] 5.959 elapsed for N == 10,000 

## (option b) by Ricardo Saporta 
round(rowMeans(replicate(3, system.time({ 
    results <- 
     foreach(x=unique(dt[["a"]])) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])] 
    rbindlist(results) 
}))), 3) 
# [1] 5.557 elapsed for N == 10,000 

registerDoSEQ() 
getDoParWorkers() 
# [1] 1