2010-10-25 4 views
6

Я изучал возможности параллельной/асинхронной обработки Ruby и читал много статей и сообщений в блогах. Я просмотрел EventMachine, волокнам, Revactor, Реи, и т.д., и т.д. К сожалению, я не смог найти простое, эффективное (и не-IO-блокирующий) решения для этого очень простого случая использования:Ruby параллелизм/асинхронная обработка (с простым прецедентом)

File.open('somelogfile.txt') do |file| 
    while line = file.gets  # (R) Read from IO 
    line = process_line(line) # (P) Process the line 
    write_to_db(line)   # (W) Write the output to some IO (DB or file) 
    end 
end 

ли вы можете видеть, мой маленький скрипт выполняет три операции чтения (R), процесс (P) & записи (W). Давайте предположим, что - для простоты - что каждая операция занимает ровно 1 единицу времени (например, 10 мс), поэтому текущий код будет делать что-то подобное (5 строк):

Time:  123456789(15 units in total) 
Operations: RPWRPWRPWRPWRPW 

Но я хотел бы, чтобы сделать что-то как это:

Time:  1234567 (7 units in total) 
Operations: RRRRR 
      PPPPP 
       WWWWW 

Очевидно, что я мог бы запустить три процесса (читатель, процессор & писателя) и передать читать строки из считывателя в очередь процессора, а затем передать обработанные строки в очередь писателя (все координируется с помощью, например, RabbitMQ) , Но прецедент настолько прост, что он просто не чувствует себя хорошо.

Любые подсказки о том, как это можно сделать (без переключения с Ruby на Erlang, Closure или Scala)?

+1

Должны ли быть вызваны записи в том же порядке, в каком они были прочитаны? –

+0

Нет, все дело в том, что они могут быть полностью асинхронными. – Dim

ответ

1

Отметьте персик (http://peach.rubyforge.org/). Выполнение параллели «каждый» не может быть проще. Однако, как говорится в документации, вам нужно запустить под JRuby, чтобы использовать собственную поточную передачу JVM.

См. Ответ Йорга Миттага на номер this SO question для подробного описания возможностей многопоточности различных интерпретаторов Ruby.

+0

Хмм, персик на самом деле не то, что я ищу. Я не хочу запускать RPW параллельно, я хочу отделить 3 задачи друг от друга и запустить их асинхронно. Ответ Йорга Миттага дает большое представление. Я хорошо осведомлен о предлагаемых вариантах, но ни один из них, похоже, не ответил на мои проблемы. – Dim

3

Если вам нужно, чтобы он был действительно параллельным (из одного процесса), я считаю, что вам нужно будет использовать JRuby для получения истинных собственных потоков и без GIL.

Вы можете использовать что-то вроде DRb для распространения обработки по нескольким процессам/ядрам, но для вашего использования это немного. Вместо этого вы могли бы попытаться иметь несколько процессов взаимодействуют с помощью труб:

$ cat somelogfile.txt | ruby ./proc-process | ruby ./proc-store 

В этом случае каждая часть его собственный процесс, который может работать параллельно, но общается с помощью STDIN/STDOUT. Это, пожалуй, самый простой (и самый быстрый) подход к вашей проблеме.

# proc-process 
while line = $stdin.gets do 
    # do cpu intensive stuff here 
    $stdout.puts "data to be stored in DB" 
    $stdout.flush # this is important 
end 

# proc-store 
while line = $stdin.gets do 
    write_to_db(line) 
end 
+1

Я подумал, что GIL Ruby 1.9 позволяет вам делать материал процессора в одном потоке, а другой поток - I/O, то есть он запрещает только два потока, делающие процессор. –

+0

Вы говорите о волокнах?Мое ограниченное понимание Fibers заключается в том, что вместо потоков, каждый из которых имеет общий объем процессорного времени, ваш код явно передает обработку Fiber, которая может обрабатывать блокирующую операцию ввода-вывода и немедленно возвращаться к вызывающему коду. Это уменьшает количество времени, которое вы тратите на ожидание, но я не думаю, что это позволит вам охватить более одного процессора за процесс. Я думаю, что GIL означает, что только один поток исполнения может работать в любой момент времени. http://www.igvita.com/2009/05/13/fibers-cooperative-scheduling-in-ruby/ – JEH

+2

Использование труб - хорошее решение для разбиения проблемы на 3 отдельных процесса, но оно не является асинхронным. Фактически это «обходной путь Ruby», поэтому его сложно реализовать в рамках большего приложения. «Проблема», описанная выше, является простым примером обработки, управляемой IO. Я пытаюсь понять, что Ruby способен в этой области и чего может не хватать. – Dim