2017-01-16 3 views
0

Задача: Мне нужно контролировать порядок выполнения, в котором задачи обрабатываются параллельно циклом foreach. К сожалению, это не поддерживается foreach.R - doRedis - перезаписать getTask для управления порядком выполнения в параллельных петлях foreach

Решение: Использование doRedis для использования базы данных для хранения всех задач, выполняемых в цикле foreach. Чтобы контролировать порядок, я хочу переписать getTask с помощью setGetTask, чтобы получить задания на основе заранее заданного порядка. Хотя я не мог найти много документации о том, как это сделать.

Дополнительная информация:

  1. Существует небольшой абзац на setGetTask с примером в redis documentation.

    getTask <- function (queue , job_id , ...) 
    { 
    
        key <- sprintf(" 
        redisEval("local x=redis.call('hkeys',KEYS[1])[1]; 
           if x==nil then return nil end; 
           local ans=redis.call('hget',KEYS[1],x); 
           redis.call('hdel',KEYS[1],x);i 
           return ans",key) 
    } 
    
    setGetTask(getTask) 
    

    Я, хотя думаю, что код в документации синтаксически не правильно (отсутствует имхо "и закрывающую скобку„)“). Я думал, что это не возможно на CRAN, как код документации выполняется по представлению.

  2. Изменение функции getTask ничего не меняет в отношении работников, получающих задачи (даже если введение очевидного нонсенса в redisEval, как меняется его redisEval («DDDDDDDDDD (((»)

  3. У меня был доступ к функции setGetTask после установки пакета из sourc е (который я скачал с official CRAN package page of version 1.1.1 (что имхо не должно иметь никакого значения, чем устанавливать его непосредственно из CRAN)

данных: Dataframe задач для выполнения выглядит следующим образом:

taskName;taskQueuePosition;parameter1;paramterN 
taskT;1;val1;10 
taskK;2;val2;8 
taskP;3;val3;7 
taskA;4;val4;7 

Я хочу использовать «taskQueuePosition» для управления порядком, сначала должны выполняться задачи с более низкими номерами.

Вопросы:

  1. Кто-нибудь знает какие-либо источники, где я могу получить больше информации о делать это с doRedis или setGetTask?
  2. Кто-нибудь знает, как мне нужно изменить getTask для достижения описанного выше?
  3. Любые другие умные идеи для управления порядком выполнения в цикле foreach? Предпочтительно, чтобы в какой-то момент я мог использовать doRedis как параллельный задний конец (изменение этого означало бы значительное изменение в обработке из-за сложных технических причин инфраструктуры).

Код (для легкого воспроизведения):

Далее предполагается, что Redis-сервер запускается на локальной машине.

Redis DB Начинка:

library(doRedis) 
library(foreach) 

options('redis:num'=TRUE) # needed for proper execution 

REDIS_JOB_QUEUE = "jobs" 
registerDoRedis(REDIS_JOB_QUEUE) 

# filling up the data frame 
taskDF = data.frame(taskName=c("taskT","taskK","taskP","taskA"), 
      taskQueuePosition=c(1,2,3,4), 
      parameter1=c("val1","val2","val3","val4"), 
      parameterN=c(10,8,7,7)) 

foreach(currTask=iter(taskDF, by='row'), 
     .verbose = T 
) %dopar% { 
    print(paste("Executing task: ",currTask$taskName)) 
    Sys.sleep(currTask$parameterN) 
} 

removeQueue(REDIS_JOB_QUEUE) 

работник:

library(doRedis) 
REDIS_JOB_QUEUE = "jobs" 

startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE) 

ответ

0

я мог бы решить эту проблему и теперь может контролировать порядок выполнения задачи.

Дополнительная информация:

1. Там, кажется, опечатка в документации, что делает пример getTask не работает. Рассматривая форму функции default_getTask из файла task.R в пакете, он должен выглядеть, вероятно, что-то вроде:

getTaskDefault <- function (queue , job_id , ...) 
{ 
    key <- sprintf("%s:%s",queue, job_id) 
    return(redisEval("local x=redis.call('hkeys',KEYS[1])[1]; 
        if x==nil then return nil end; 
        local ans=redis.call('hget',KEYS[1],x); 
        redis.call('set', KEYS[1] .. '.start.' .. x, x); 
        redis.call('hdel',KEYS[1],x); 
        return ans",key)) 
} 

Кажется, что письма за первым знаком процента в первой строке функции заблудились , Это объясняет нечетное количество скобок и котировок.

2) setGetTask все еще не имеет никакого эффекта для меня. Когда я устанавливаю функцию getTask, но через .option, пока заполняется БД (как описано в vignette of the package), он успешно вызван.

3) Информация о 2) означает, что мне не нужна функция getTask, поэтому я могу использовать пакет из CRAN.

----- Вопросы -----

1) Виньетка doRedis описывает, как можно успешно установить пользовательские getTask.

2 и 3) Когда скрипт LUA в функции getTask модифицирован, как показано ниже, задачи извлекаются из базы данных так, как они представлены. Это не совсем то, о чем я просил, но из-за временных ограничений и того факта, что у меня (или лучше было) нет первой идеи о сценарии LUA, это imho - удовлетворительное решение для контроля порядка представления столбцом taskQueuePosition.

getTaskInOrder <- function (queue , job_id , ...) 
{ 

    key <- sprintf("%s:%s",queue, job_id) 
    return(redisEval(" 

     local tasks=redis.call('hkeys',KEYS[1]); -- get all tasks 

     local x=tasks[1];   -- get first task available task 
     if x==nil then    -- if there are no tasks left, stop processing 
      return nil 
     end; 

     local xMin = 65535;   -- if we have more tasks than 65535, getting the 
     -- task with the lowest taskID is not guaranteed to be the first one 
     local i = 1; 
     -- local iMinFound = -1; 
     while (x ~= nil) do   -- search the array until there are no tasks left 
     -- print('x: ',x) 
     local xNum = tonumber(x); 
     if(xNum<xMin) then 
      xMin = xNum; 
      -- iMinFound = i; 
     end 
     i=i+1; 
     -- print('i is now: ',i); 
     x=tasks[i]; 
     end 
     -- print('Minimum is task number',xMin,' found at i ', iMinFound) 
     x=tostring(xMin)   -- convert it back to a string (maybe it would 
            -- be better to keep the original string somewhere, 
            -- in case we loose some information whilst converting to number) 

     -- print('x is now:',x); 
     -- print(KEYS[1] .. '.start.' .. x, x); 
     -- print(''); 
     local ans=redis.call('hget',KEYS[1],x); 
     redis.call('set', KEYS[1] .. '.start.' .. x, x); 
     redis.call('hdel',KEYS[1],x); 
     return ans",key)) 
} 

Важное примечание: я заметил, что если задача прервана, порядок завинчивается и повторно задача (даже несмотря на то, задача номер остается тем же), будет выполняться после того, как первоначально представленный задания. Это нормально для меня.

------ Код (для легкого воспроизведения): ------

Это приводит к следующему примеру кода (с 12 записей в кадре данных задач, вместо оригинального 4):

Redis БД Наполнитель:

library(doRedis) 
library(foreach) 

options('redis:num'=TRUE) # needed for proper execution 

REDIS_JOB_QUEUE = "jobs" 

getTaskInOrder <- function (queue , job_id , ...) 
{ 
    ...like above 
} 

registerDoRedis(REDIS_JOB_QUEUE) 

# filling up the data frame already in order of tasks to be executed 
# otherwise the dataframe has to be sorted by taskQueuePosition 
taskDF = data.frame(taskName=c("taskA","taskB","taskC","taskD","taskE","taskF","taskG","taskH","taskI","taskJ","taskK","taskL"), 
     taskQueuePosition=c(1,2,3,4,5,6,7,8,9,10,11,12), 
     parameter1=c("val1","val2","val3","val4","val1","val2","val3","val4","val1","val2","val3","val4"), 
     parameterN=c(5,5,5,4,4,4,4,3,3,3,2,2)) 

foreach(currTask=iter(taskDF, by='row'), 
     .verbose = T, 
     .options.redis = list(getTask = getTaskInOrder 
) %dopar% { 
    print(paste("Executing task: ",currTask$taskName)) 
    Sys.sleep(currTask$parameterN) 
} 

removeQueue(REDIS_JOB_QUEUE) 

работник:

library(doRedis) 
REDIS_JOB_QUEUE = "jobs" 

startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE) 

Еще одно замечание: только в случае, если вы обрабатываете длинные рабочие места, как я, пожалуйста, обратите внимание a bug in redis 1.1.1 (текущая версия на CRAN), что приводит к задачам быть повторно представлены (из-за тайм-аута), несмотря на работников еще работая над ними.