Недостающие значения являются результатом использования горячего наблюдаемого (Observable.interval(300)
), который продолжает выдавать значения, которые вы не храните для использования.
Ниже приведена небольшая упрощенная версия вашего кода, которая также регистрирует время, в которое исчисляются числа. Я заменил Math.random()
на 1
так, чтобы выход был детерминированным. Я также загрузил код в jsbin для вас попробовать:
https://jsbin.com/burocu/edit?js,console
Observable.interval(300)
.do(x => console.log(x + ") hot observable at: " + (x * 300 + 300)))
.take(12)
.windowCount(3)
.do(observe3 => {observe3.toArray()
.subscribe(x => console.log(x + " do window count at: " + (x[2] * 300 + 300)));})
.concatMap(obs => {
return obs.mergeMap(
v => Observable.timer(1 * 1500).mapTo(v)
)
.do(v => console.log(v + " merge map at: " + (v * 300 + 300 + 1500)));
})
.finally(() => console.log('fin windowCount'))
.subscribe();
Это приводит к следующей продукции. Обратите внимание, что горячие наблюдаемые маршируют, пока другие операторы все еще обрабатываются.
Это то, что создает впечатление, что ценности отбрасываются. Вы можете видеть, что windowCount(3)
делает что Вы подумали, но не когда Вы подумали.
"0) hot observable at: 300"
"1) hot observable at: 600"
"2) hot observable at: 900"
"0,1,2 do window count at: 900"
"3) hot observable at: 1200"
"4) hot observable at: 1500"
"5) hot observable at: 1800"
"3,4,5 do window count at: 1800"
"0 merge map at: 1800"
"6) hot observable at: 2100"
"1 merge map at: 2100"
"7) hot observable at: 2400"
"2 merge map at: 2400"
"8) hot observable at: 2700"
"6,7,8 do window count at: 2700"
"9) hot observable at: 3000"
"10) hot observable at: 3300"
"11) hot observable at: 3600"
"9,10,11 do window count at: 3600"
" do window count at: NaN"
"8 merge map at: 4200"
"fin windowCount"
Редактировать: дальнейшее объяснение ...
После windowCount(3)
есть вызов concatMap
. concatMap
представляет собой комбинацию map
и concatAll
.
concatAll
:
присоединяется каждый Наблюдаемое, излучаемый источником (более высокого порядка Наблюдаемый), последовательным способом. Он присоединяется к каждому внутреннему Observable только после того, как предыдущий внутренний наблюдаемый завершил (выделено мной) и объединяет все их значения в возвращаемое наблюдаемое.
Итак, глядя на выходы выше, мы видим, что первые значения windowCount(3)
[0,1,2] испускаются между 1800 и 2400.
Обратите внимание, что вторые windowCount(3)
значения [3,4,5] излучаются 1800. concatAll
не готов подписаться, когда [3,4,5] излучается потому что предыдущий внутренний Наблюдаемые не завершенного еще. Таким образом, эти значения эффективно удаляются.
Далее, обратите внимание, что предыдущий внутренний Наблюдаемое [0,1,2] завершается в 2400. concatAll
подписывается на 2400.
Следующее значение появится это значение 8 при 2700 (300мс после того, как подписка началась в 2400). Затем значение 8 выводится на mergeMap
на 4200 из-за задержки интервала 300 от начальной точки подписки 2400, а затем задержки таймера 1500 (то есть 2400 + 300 + 1500 = 4200).
После этого момента последовательность завершена, поэтому никакие дополнительные значения не выбрасываются.
Пожалуйста, добавьте комментарий, если требуется дополнительное разъяснение.