2012-06-11 3 views
39

Мне нравится установка .progress = 'text' в plyr'sllply. Тем не менее, это вызывает у меня большую тревогу, чтобы не знать, как далеко продвигается mclapply (из пакета multicore), поскольку элементы списка отправляются на различные ядра и затем сортируются в конце.Есть ли способ отслеживать прогресс на mclapply?

Я выводил сообщения, такие как *currently in sim_id # ....*, но это не очень полезно, потому что это не дает мне индикатора того, какой процент элементов списка завершен (хотя полезно знать, что мой скрипт не застрял и перемещение).

Может кто-нибудь предложить другие идеи, которые позволили бы мне посмотреть мой файл .Rout и получить представление о прогрессе? Я подумал о добавлении ручного счетчика, но не вижу, как это реализовать, поскольку mclapply должен завершить обработку всех элементов списка, прежде чем он сможет выдать любую обратную связь.

+1

См. Мой ответ на аналогичный вопрос: http://stackoverflow.com/a/5431265/653825 – otsaw

+0

Отличный ответ здесь @fotNelton и другие, основанные на его повторном использовании. В качестве быстрого решения увидеть прогресс в одноразовых вызовах «mclapply» вы также можете просто «cat (». »)' В рабочей функции. – codeola

+0

Отличный вопрос, 'package multicore' больше недоступен, есть ли обходной путь без пакета' multicore'? – forecaster

ответ

26

В связи с тем, что mclapply порождает несколько процессов, можно использовать фифы, трубы или даже сокеты. Теперь рассмотрим следующий пример:

library(multicore) 

finalResult <- local({ 
    f <- fifo(tempfile(), open="w+b", blocking=T) 
    if (inherits(fork(), "masterProcess")) { 
     # Child 
     progress <- 0.0 
     while (progress < 1 && !isIncomplete(f)) { 
      msg <- readBin(f, "double") 
      progress <- progress + as.numeric(msg) 
      cat(sprintf("Progress: %.2f%%\n", progress * 100)) 
     } 
     exit() 
    } 
    numJobs <- 100 
    result <- mclapply(1:numJobs, function(...) { 
     # Dome something fancy here 
     # ... 
     # Send some progress update 
     writeBin(1/numJobs, f) 
     # Some arbitrary result 
     sample(1000, 1) 
    }) 
    close(f) 
    result 
}) 

cat("Done\n") 

Здесь временный файл используется как ФИФО и основные процесс разветвляется ребенок, только обязанность сообщать текущий прогресс. Основной процесс продолжается, вызывая mclapply, где выражение (точнее, блок выражения), которое должно быть оценено, записывает частичную информацию о ходе в буфер fifo посредством writeBin.

Поскольку это всего лишь простой пример, вам, вероятно, придется адаптировать весь материал вывода к вашим потребностям. НТН!

+0

Действительно ли это отличается от использования стандартных функций 'message' и' sink'? Сообщения от всех дочерних процессов идут в один приемник без задержки, не так ли? – otsaw

+2

В случае 'mclapply' основной процесс ожидает завершения всех дочерних процессов, без наложения другого дочернего процесса нет способа получать и обрабатывать сообщения, пока' mclapply' все еще работает. – fotNelton

+0

@fotNelton: Исходя из моего опыта, дочерние процессы, как представляется, отправляют stdout и stderr так же, как и родительский процесс без каких-либо задержек. Но, возможно, это зависит от ОС? – otsaw

7

Здесь используется функция, основанная на @fotNelton's solution для применения везде, где вы обычно используете mcapply.

mcadply <- function(X, FUN, ...) { 
    # Runs multicore lapply with progress indicator and transformation to 
    # data.table output. Arguments mirror those passed to lapply. 
    # 
    # Args: 
    # X: Vector. 
    # FUN: Function to apply to each value of X. Note this is transformed to 
    #  a data.frame return if necessary. 
    # ...: Other arguments passed to mclapply. 
    # 
    # Returns: 
    # data.table stack of each mclapply return value 
    # 
    # Progress bar code based on https://stackoverflow.com/a/10993589 
    require(multicore) 
    require(plyr) 
    require(data.table) 

    local({ 
    f <- fifo(tempfile(), open="w+b", blocking=T) 
    if (inherits(fork(), "masterProcess")) { 
     # Child 
     progress <- 0 
     print.progress <- 0 
     while (progress < 1 && !isIncomplete(f)) { 
     msg <- readBin(f, "double") 
     progress <- progress + as.numeric(msg) 
     # Print every 1% 
     if(progress >= print.progress + 0.01) { 
      cat(sprintf("Progress: %.0f%%\n", progress * 100)) 
      print.progress <- floor(progress * 100)/100 
     } 
     } 
     exit() 
    } 

    newFun <- function(...) { 
     writeBin(1/length(X), f) 
     return(as.data.frame(FUN(...))) 
    } 

    result <- as.data.table(rbind.fill(mclapply(X, newFun, ...))) 
    close(f) 
    cat("Done\n") 
    return(result) 
    }) 
} 
2

На основании ответа @fotNelson, используя прогресс бар вместо линии по линии печати и вызов внешней функции с mclapply.

library('utils') 
library('multicore') 

prog.indic <- local({ #evaluates in local environment only 
    f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection 
    assign(x='f',value=f,envir=.GlobalEnv) 
    pb <- txtProgressBar(min=1, max=MC,style=3) 

    if (inherits(fork(), "masterProcess")) { #progress tracker 
     # Child 
     progress <- 0.0 
     while (progress < MC && !isIncomplete(f)){ 
      msg <- readBin(f, "double") 
       progress <- progress + as.numeric(msg) 

      # Updating the progress bar. 
      setTxtProgressBar(pb,progress) 
      } 


     exit() 
     } 
    MC <- 100 
    result <- mclapply(1:MC, .mcfunc) 

    cat('\n') 
    assign(x='result',value=result,envir=.GlobalEnv) 
    close(f) 
    }) 

.mcfunc<-function(i,...){ 
     writeBin(1, f) 
     return(i) 
    } 

Назначение соединения ФИФО к .GlobalEnv необходимо использовать его из функции вне вызова mclapply. Спасибо за вопросы и предыдущие ответы, мне было интересно, как это сделать какое-то время.

11

По сути, добавив еще один вариант @ fotNelson-й решения, но с некоторыми изменениями:

  • Падением в замене для mclapply (поддерживает все функции mclapply)
  • уловов Ctrl-C звонки и прекращает грациозно
  • использует встроенную (txtProgressBar)
  • возможность отслеживать прогресс или нет и использовать указанный стиль индикатора выполнения
  • скорее всего использует parallel чем multicore который теперь был удален из CRAN
  • X принуждает к списку согласно mclapply (так длина (X) дает ожидаемые результаты)
  • roxygen2 стиль документация в верхнем

Надеется, что это поможет кому-то .. ,

library(parallel) 

#------------------------------------------------------------------------------- 
#' Wrapper around mclapply to track progress 
#' 
#' Based on http://stackoverflow.com/questions/10984556 
#' 
#' @param X   a vector (atomic or list) or an expressions vector. Other 
#'     objects (including classed objects) will be coerced by 
#'     ‘as.list’ 
#' @param FUN  the function to be applied to 
#' @param ...  optional arguments to ‘FUN’ 
#' @param mc.preschedule see mclapply 
#' @param mc.set.seed see mclapply 
#' @param mc.silent see mclapply 
#' @param mc.cores see mclapply 
#' @param mc.cleanup see mclapply 
#' @param mc.allow.recursive see mclapply 
#' @param mc.progress track progress? 
#' @param mc.style style of progress bar (see txtProgressBar) 
#' 
#' @examples 
#' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01)) 
#' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1) 
#' 
#' dat <- lapply(1:10, function(x) rnorm(100)) 
#' func <- function(x, arg1) mean(x)/arg1 
#' mclapply2(dat, func, arg1=10, mc.cores=2) 
#------------------------------------------------------------------------------- 
mclapply2 <- function(X, FUN, ..., 
    mc.preschedule = TRUE, mc.set.seed = TRUE, 
    mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L), 
    mc.cleanup = TRUE, mc.allow.recursive = TRUE, 
    mc.progress=TRUE, mc.style=3) 
{ 
    if (!is.vector(X) || is.object(X)) X <- as.list(X) 

    if (mc.progress) { 
     f <- fifo(tempfile(), open="w+b", blocking=T) 
     p <- parallel:::mcfork() 
     pb <- txtProgressBar(0, length(X), style=mc.style) 
     setTxtProgressBar(pb, 0) 
     progress <- 0 
     if (inherits(p, "masterProcess")) { 
      while (progress < length(X)) { 
       readBin(f, "double") 
       progress <- progress + 1 
       setTxtProgressBar(pb, progress) 
      } 
      cat("\n") 
      parallel:::mcexit() 
     } 
    } 
    tryCatch({ 
     result <- mclapply(X, ..., function(...) { 
       res <- FUN(...) 
       if (mc.progress) writeBin(1, f) 
       res 
      }, 
      mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed, 
      mc.silent = mc.silent, mc.cores = mc.cores, 
      mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive 
     ) 

    }, finally = { 
     if (mc.progress) close(f) 
    }) 
    result 
} 
+0

Эта версия на самом деле не показывает ход выполнения задачи. Индикатор выполнения начинается с 0% и остается там. – Ariel

+0

OK должна быть ошибка - я рассмотрю ее ... – wannymahoots

+0

Эта функция работает для меня на OS X и Linux, так что, возможно, это проблема с окнами. – wannymahoots

6

Пакет pbapply реализовал это для общего случая. И pblapply, и pbsapply имеют аргумент cl. От documentation:

Параллельная обработка может быть разрешена с помощью аргумента cl. parLapply вызывается, когда cl является объектом 'cluster', mclapply вызывается, когда cl является целым числом. Отображение индикатора выполнения увеличивает связь служебных данных между основным процессом и узлами/дочерними процессами, сравнивая с параллельными эквивалентами функций без индикатора выполнения. Функции возвращаются к их исходным эквивалентам, когда индикатор выполнения отключен (то есть getOption("pboptions")$type == "none"dopb() - FALSE). Это значение по умолчанию, если interactive() , если FALSE (т. Е. Вызвано из сценария командной строки R).

Если один не поставляет cl (или проходит NULL) по умолчанию непараллельного lapply используются, включая также прогресс бар.