Мое решение в кофе с Highland.js:
_ = require('underscore')
H = require('highland')
fs = require('fs')
debug = require('debug')
log = debug('main')
assert = require('assert')
readS = H(fs.createReadStream('walmart.dump')).map((buffer) ->
{ buffer: buffer }
)
MAX_SIZE = 10 ** 7
counter = 0
nextStream =()->
stream = fs.createWriteStream("result/data#{counter}.txt")
wrapper = H.wrapCallback(stream.write.bind(stream))
counter += 1
return wrapper
debug('profile')('start')
s = readS.scan({
size: 0
stream: nextStream()
}, (acc, {buffer}) ->
debug('scan')(acc, buffer)
acc.size += buffer.length
acc.buffer = buffer
if acc.size > MAX_SIZE
debug('notify')(counter - 1, acc.size)
acc.size = 0
acc.stream = nextStream()
log(acc)
return acc
).filter((x)->x.buffer?)
s.parallel 4
s.flatMap((x) ->
debug('flatMap')(x)
x.stream(x.buffer)
)
.done -> debug('profile')('finish')
walmart.dump
представляет собой текстовый файл, который содержит 6Гб текста. Разделение на 649 файлов занимает:
profile start +0ms
profile finish +53s
Bacon не поддерживает противодавление, поэтому это плохо подходит для работы ввода-вывода. Без обратного давления вы можете закончить чтение быстрее, чем вы можете писать и буферизировать без ограничений. – AgentME
Вы можете посмотреть на это http://xgrommx.github.io/rx-book/content/guidelines/when/index.html – xgrommx
Спасибо. Я просмотрел статью раньше. Он проигрывает самую сложную часть: изменение результирующего потока ('appendAsync', который является псевдокодной реализацией' fs.readStream') и обратной связью с файлом, когда он превышает ограничение по размеру. – kharandziuk