2016-11-28 8 views
3

Я работаю над проектом узла, которому необходимо отправить тысячи изображений для обработки. Прежде, чем эти изображения будут загружены на сервер обработки они должны быть изменены так, у меня есть кое-что вдоль линий этого:Понимание противодавления в rxjs - только кеш 5 изображений, ожидающих загрузки

imageList 
    .map(image => loadAndResizeImage) 
    .merge(3) 
    .map(image => uploadImage) 
    .merge(3) 
    .subscribe(); 

Изображение изменение размера, как правило, занимает несколько десятых долей секунды, загрузки и обработки занимает около 4 секунд.

Как я могу предотвратить тысячу изображений с измененным размером, которые растут в памяти, поскольку я жду, когда очередь загрузки будет очищена? Я, вероятно, хочу, чтобы было изменено 5 изображений и ожидало, что, как только загрузка изображения закончится, следующее измененное изображение вытащится из очереди и будет загружено, а новое изображение будет изменено и добавлено в «буфер».

Иллюстрация вопроса можно найти здесь:

https://jsbin.com/webaleduka/4/edit?js,console

Здесь есть шаг нагрузки (с 200 мс) и стадию процесса (с 4 секунды). Каждый процесс ограничивается параллелизмом 2. Мы можем видеть, что с 25 начальными элементами мы получаем 20 изображений в памяти.

Я просмотрел варианты буферов, но ни один из них не делал то, что я хотел сделать.

На данный момент я только что объединил нагрузку, изменил размер и загрузился в один отложенный наблюдаемый, который я объединил с максимальным параллелизмом. Я хотел бы, чтобы изображения ожидали загрузки, хотя я уверен, что это должно быть возможно.

Я использую RXjs 4, но я полагаю, что директора школ будут одинаковыми для 5.

Большое спасибо.

+0

Btw вы используете RxJS 5 или RxJS 4? – martin

+0

Я использую RXJs 4 – Roaders

ответ

0

Я думаю, что мне удалось решить эту проблему с помощью оператора controlled() rxjs:

var queuedImages = 0; 

var imageSource = Rx.Observable.range(1, 25) 
    .map(index => "image_" + index) 
    .controlled(); 

imageSource 
    .map(image => loadImage(image)) 
    .merge(2) 
    .do((image) => { 
    queuedImages++; 
    console.log(`Images waiting for processing: ${queuedImages}`); 
    }) 
    .map(image => processImage(image)) 
    .merge(2) 
    .do(() => { 
    queuedImages--; 
    console.log(`Images waiting for processing: ${queuedImages}`); 

    if(queuedImages < 4){ 
     console.log(`requesting more image loads`); 
     imageSource.request(4-queuedImages); 
    } 
    }) 
    .subscribe( 
    (item) => {}, null, 
    () => console.log(`All Complete`)); 

imageSource.request(4); 

Первоначально 4 изображения запрашиваются. Они загружаются с диска и затем обрабатываются. Когда каждое изображение загружается, а затем обрабатывается, количество изображений в памяти отслеживается с использованием переменной queuedImages. Когда это число опускается ниже 4, запрашивается больше изображений.

jsbin это можно увидеть здесь:

https://jsbin.com/webaleduka/11/edit?js,console

Этот метод означает, что никогда не более чем на 6 или так изображения в кэше и гарантирует, что всегда есть достаточное количество изображений в ожидании кэша для загрузки.

3

В RxJS 5 я хотел бы сделать это следующим образом:

Observable.range(1, 25) 
    .bufferCount(5) 
    .concatMap(batch => { // process images 
     console.log('process', batch); 
     return Observable.from(batch) 
      .mergeMap(val => Observable.of('p' + val).delay(300)) 
      .toArray(); 
    }) 
    .concatMap(batch => { // send images 
     console.log('send batch', batch); 
     return Observable.from(batch) 
      .mergeMap(val => Observable.of('s' + val).delay(500)) 
      .toArray(); 
    }) 
    .subscribe(val => { 
     // console.log('response'); 
     console.log('response', val); 

    }); 

С bufferCount оператора Я разделил входной массив в пакетах по 5 элементов. Затем каждая партия сначала обрабатывается с первым concatMap() (я использую concat специально, потому что хочу дождаться завершения вложенного Observable). Затем обработанные данные отправляются другому concatMap(), который отправляет его на ваш сервер.

Я использую два оператора delay() для имитации того, что разные задачи занимают разные сроки. В нашем случае обработка изображений происходит очень быстро, поэтому первый concatMap будет испускать элементы быстрее, чем второй concatMap способен отправлять их на сервер, который в порядке. Обработанные изображения будут уложены внутри concatMap и будут отправляться партиями один за другим.

Выход из этого примера будет выглядеть следующим образом:

process [ 1, 2, 3, 4, 5 ] 
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ] 
process [ 6, 7, 8, 9, 10 ] 
process [ 11, 12, 13, 14, 15 ] 
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ] 
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ] 
process [ 16, 17, 18, 19, 20 ] 
process [ 21, 22, 23, 24, 25 ] 
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ] 
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ] 
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ] 
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ] 
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ] 
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ] 
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ] 

Смотреть демо: https://jsbin.com/mileqa/edit?js,console

Однако, если вы хотите, чтобы всегда первый процесс партии, чем отправить его, и когда он послал, чем продолжить с другой вам нужно будет переместить второй внутренний наблюдаемый из concatMap в конце toArray() в первый вызов concatMap().

.concatMap(batch => { // process images 
    console.log('process', batch); 
    return Observable.from(batch) 
     .mergeMap(val => Observable.of('p' + val).delay(100)) 
     .toArray() 
     .concatMap(batch => { // send images 
      console.log('send batch', batch); 
      return Observable.from(batch) 
       .mergeMap(val => Observable.of('s' + val).delay(500)) 
       .toArray(); 
     }); 
}) 

Смотреть демо: https://jsbin.com/sabena/2/edit?js,console

Это производит вывод, как следующее:

process [ 1, 2, 3, 4, 5 ] 
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ] 
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ] 
process [ 6, 7, 8, 9, 10 ] 
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ] 
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ] 
process [ 11, 12, 13, 14, 15 ] 
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ] 
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ] 
process [ 16, 17, 18, 19, 20 ] 
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ] 
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ] 
process [ 21, 22, 23, 24, 25 ] 
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ] 
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ] 

Вы можете видеть, что «процесс», «отправить партию» и «ответ» журналы для того, ,

Реализация в RxJS 4 должна быть почти идентичной (только имена операторов могут несколько отличаться).

В RxJS 4 есть также controlled() operator, который не exsit в RxJS 5 (еще?). Возможно, что-то очень похоже на то, что вам нужно.

+0

Спасибо за этот комментарий, но это не решит мою проблему. В первом примере вы все равно будете получать обработанные изображения, всего 5 за раз, а не по 1 за раз. Второй пример будет работать, но отправке изображений придется ждать обработки изображений до их отправки. – Roaders