В RxJS 5 я хотел бы сделать это следующим образом:
Observable.range(1, 25)
.bufferCount(5)
.concatMap(batch => { // process images
console.log('process', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('p' + val).delay(300))
.toArray();
})
.concatMap(batch => { // send images
console.log('send batch', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('s' + val).delay(500))
.toArray();
})
.subscribe(val => {
// console.log('response');
console.log('response', val);
});
С bufferCount
оператора Я разделил входной массив в пакетах по 5 элементов. Затем каждая партия сначала обрабатывается с первым concatMap()
(я использую concat специально, потому что хочу дождаться завершения вложенного Observable). Затем обработанные данные отправляются другому concatMap()
, который отправляет его на ваш сервер.
Я использую два оператора delay()
для имитации того, что разные задачи занимают разные сроки. В нашем случае обработка изображений происходит очень быстро, поэтому первый concatMap
будет испускать элементы быстрее, чем второй concatMap
способен отправлять их на сервер, который в порядке. Обработанные изображения будут уложены внутри concatMap
и будут отправляться партиями один за другим.
Выход из этого примера будет выглядеть следующим образом:
process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
process [ 6, 7, 8, 9, 10 ]
process [ 11, 12, 13, 14, 15 ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
process [ 16, 17, 18, 19, 20 ]
process [ 21, 22, 23, 24, 25 ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]
Смотреть демо: https://jsbin.com/mileqa/edit?js,console
Однако, если вы хотите, чтобы всегда первый процесс партии, чем отправить его, и когда он послал, чем продолжить с другой вам нужно будет переместить второй внутренний наблюдаемый из concatMap
в конце toArray()
в первый вызов concatMap()
.
.concatMap(batch => { // process images
console.log('process', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('p' + val).delay(100))
.toArray()
.concatMap(batch => { // send images
console.log('send batch', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('s' + val).delay(500))
.toArray();
});
})
Смотреть демо: https://jsbin.com/sabena/2/edit?js,console
Это производит вывод, как следующее:
process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
process [ 6, 7, 8, 9, 10 ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
process [ 11, 12, 13, 14, 15 ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
process [ 16, 17, 18, 19, 20 ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
process [ 21, 22, 23, 24, 25 ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]
Вы можете видеть, что «процесс», «отправить партию» и «ответ» журналы для того, ,
Реализация в RxJS 4 должна быть почти идентичной (только имена операторов могут несколько отличаться).
В RxJS 4 есть также controlled()
operator, который не exsit в RxJS 5 (еще?). Возможно, что-то очень похоже на то, что вам нужно.
Btw вы используете RxJS 5 или RxJS 4? – martin
Я использую RXJs 4 – Roaders