2016-10-01 1 views
1

Я пытаюсь сгруппировать свои наблюдаемые значения в группы, используя windowCount, и для каждого значения каждого запроса на отправку группы.
Затем объедините эти группы, чтобы запросы следующей группы не запускались до того, как запрос текущей группы не будет завершен.
Проблема в том, что некоторые значения пропускаются.

Вот мой код.
(Я не делаю настоящих вызовов ajax здесь, но Observable.timer должен работать для примера).windowCount dropping values ​​

Observable.interval(300) 
    .take(12) 
    .windowCount(3) 
    .concatMap(obs => { 
     return obs.mergeMap(
      v => Observable.timer(Math.random() * 1500).mapTo(v) 
     ); 
    }) 
    .do(v => console.log(v)) 
    .finally(() => console.log('fin')) 
    .subscribe(); 

Я попытался заменить windowCount, создав группы вручную. И он отлично работает. Никакие значения не пропускаются.

Observable.interval(900) 
    .take(4) 
    .map(i => Observable.interval(300).take(3).map(j => j + i * 3)) 
    .concatMap(obs => { 
     return obs.mergeMap(
      v => Observable.timer(Math.random() * 1500).mapTo(v) 
     ); 
    }) 
    .do(v => console.log(v)) 
    .finally(() => console.log('fin')) 
    .subscribe(); 

У меня было впечатление, что windowCount должен группировать излучаемые значения одинаково.
Но, видимо, он делает что-то еще.

Я был бы очень благодарен за любое объяснение его поведения.

Спасибо!

ответ

0

Недостающие значения являются результатом использования горячего наблюдаемого (Observable.interval(300)), который продолжает выдавать значения, которые вы не храните для использования.

Ниже приведена небольшая упрощенная версия вашего кода, которая также регистрирует время, в которое исчисляются числа. Я заменил Math.random() на 1 так, чтобы выход был детерминированным. Я также загрузил код в jsbin для вас попробовать:

https://jsbin.com/burocu/edit?js,console

Observable.interval(300) 
    .do(x => console.log(x + ") hot observable at: " + (x * 300 + 300))) 
    .take(12) 
    .windowCount(3) 
    .do(observe3 => {observe3.toArray() 
     .subscribe(x => console.log(x + " do window count at: " + (x[2] * 300 + 300)));}) 
    .concatMap(obs => { 
     return obs.mergeMap(
      v => Observable.timer(1 * 1500).mapTo(v) 
     ) 
     .do(v => console.log(v + " merge map at: " + (v * 300 + 300 + 1500))); 
    }) 
    .finally(() => console.log('fin windowCount')) 
    .subscribe(); 

Это приводит к следующей продукции. Обратите внимание, что горячие наблюдаемые маршируют, пока другие операторы все еще обрабатываются.

Это то, что создает впечатление, что ценности отбрасываются. Вы можете видеть, что windowCount(3) делает что Вы подумали, но не когда Вы подумали.

"0) hot observable at: 300" 
"1) hot observable at: 600" 
"2) hot observable at: 900" 
"0,1,2 do window count at: 900" 
"3) hot observable at: 1200" 
"4) hot observable at: 1500" 
"5) hot observable at: 1800" 
"3,4,5 do window count at: 1800" 
"0 merge map at: 1800" 
"6) hot observable at: 2100" 
"1 merge map at: 2100" 
"7) hot observable at: 2400" 
"2 merge map at: 2400" 
"8) hot observable at: 2700" 
"6,7,8 do window count at: 2700" 
"9) hot observable at: 3000" 
"10) hot observable at: 3300" 
"11) hot observable at: 3600" 
"9,10,11 do window count at: 3600" 
" do window count at: NaN" 
"8 merge map at: 4200" 
"fin windowCount" 

Редактировать: дальнейшее объяснение ...

После windowCount(3) есть вызов concatMap. concatMap представляет собой комбинацию map и concatAll.

concatAll:

присоединяется каждый Наблюдаемое, излучаемый источником (более высокого порядка Наблюдаемый), последовательным способом. Он присоединяется к каждому внутреннему Observable только после того, как предыдущий внутренний наблюдаемый завершил (выделено мной) и объединяет все их значения в возвращаемое наблюдаемое.

Итак, глядя на выходы выше, мы видим, что первые значения windowCount(3) [0,1,2] испускаются между 1800 и 2400.

Обратите внимание, что вторые windowCount(3) значения [3,4,5] излучаются 1800. concatAll не готов подписаться, когда [3,4,5] излучается потому что предыдущий внутренний Наблюдаемые не завершенного еще. Таким образом, эти значения эффективно удаляются.

Далее, обратите внимание, что предыдущий внутренний Наблюдаемое [0,1,2] завершается в 2400. concatAll подписывается на 2400.

Следующее значение появится это значение 8 при 2700 (300мс после того, как подписка началась в 2400). Затем значение 8 выводится на mergeMap на 4200 из-за задержки интервала 300 от начальной точки подписки 2400, а затем задержки таймера 1500 (то есть 2400 + 300 + 1500 = 4200).

После этого момента последовательность завершена, поэтому никакие дополнительные значения не выбрасываются.

Пожалуйста, добавьте комментарий, если требуется дополнительное разъяснение.

 Смежные вопросы

  • Нет связанных вопросов^_^