2015-06-02 1 views
1

Рассмотрим задачу:Динамические трубопроводы с FRP

  • раздвоение файл по линии
  • строки записи в результате файл
  • если результат файла превышает некоторый размер создать новый файл результатов

Например, если у меня есть файл размером 4gb и split размером 1gb. Результат - четыре файла весом 1 гб.

Я ищу решение с чем-то вроде Rx */Bacon или любой другой подобной библиотеки на любом языке.

+1

Bacon не поддерживает противодавление, поэтому это плохо подходит для работы ввода-вывода. Без обратного давления вы можете закончить чтение быстрее, чем вы можете писать и буферизировать без ограничений. – AgentME

+1

Вы можете посмотреть на это http://xgrommx.github.io/rx-book/content/guidelines/when/index.html – xgrommx

+0

Спасибо. Я просмотрел статью раньше. Он проигрывает самую сложную часть: изменение результирующего потока ('appendAsync', который является псевдокодной реализацией' fs.readStream') и обратной связью с файлом, когда он превышает ограничение по размеру. – kharandziuk

ответ

0

Мое решение в кофе с Highland.js:

_ = require('underscore') 
H = require('highland') 
fs = require('fs') 
debug = require('debug') 
log = debug('main') 
assert = require('assert') 

readS = H(fs.createReadStream('walmart.dump')).map((buffer) -> 
    { buffer: buffer } 
) 
MAX_SIZE = 10 ** 7 
counter = 0 
nextStream =()-> 
    stream = fs.createWriteStream("result/data#{counter}.txt") 
    wrapper = H.wrapCallback(stream.write.bind(stream)) 
    counter += 1 
    return wrapper 


debug('profile')('start') 
s = readS.scan({ 
    size: 0 
    stream: nextStream() 
    }, (acc, {buffer}) -> 
    debug('scan')(acc, buffer) 
    acc.size += buffer.length 
    acc.buffer = buffer 
    if acc.size > MAX_SIZE 
     debug('notify')(counter - 1, acc.size) 
     acc.size = 0 
     acc.stream = nextStream() 
    log(acc) 
    return acc 
).filter((x)->x.buffer?) 

s.parallel 4 
s.flatMap((x) -> 
    debug('flatMap')(x) 
    x.stream(x.buffer) 
) 
.done -> debug('profile')('finish') 

walmart.dump представляет собой текстовый файл, который содержит 6Гб текста. Разделение на 649 файлов занимает:

profile start +0ms 
    profile finish +53s 

 Смежные вопросы

  • Нет связанных вопросов^_^