Я пытаюсь связать вызовы с сервером максимум до maxEntries, но не хочу ждать дольше, чем maxWait ms. Это имело обыкновение быть доступным как windowWithTimeOrCount()
в RxJS 4, но было удалено из RxJS 5.RxJS - using windowWhen() для maxWait и maxElements windows
Все работает отлично, за исключением того, что последний элемент окна потерян. И говоря о «потерянном» - вот как я себя чувствую сейчас. Любой Гуру RxJS, который мог бы рассказать мне, что я делаю неправильно?
private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {
// We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
// but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
// complete after maxEntries/maxWait (whatever comes first).
const toggleSubject = new Subject<void>();
return queue
// Start emitting a new Observable every time toggleSubject emits.
// (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
// complete collection)
.windowWhen(() => toggleSubject)
// map() is called once for every window (maxEntries/maxWait)
// the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
// count all elements, then emitting on toggleSubject, triggering a new Observable.
// (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed
// and the hooked up version with the inner do() would never be called.)
.map((obs) => {
// counts the number of cacheEntries already in this stream
let count = 0;
// flag to kill the timeout callback
let done = false;
// we have to return an Observable
return obs.do(() => {
count++;
if (count === 1) {
// we start counting when the first element is streamed.
IntervalObservable.create(maxWait).first().subscribe(() => {
if (!done) {
//trigger due to maxWait
toggleSubject.next(null);
}
});
}
if (count > (maxEntries)) {
done = true;
// trigger due due to maxEntries(' + maxEntries + ')');
toggleSubject.next(null);
}
}
);
});
}
элемент, который вызывает toggleSubject.next(null)
вследствие if (count > (maxEntries))
теряется (не в любом окне).
EDIT: maxTime начинает тикать в момент нажатия первого элемента нового наблюдаемого. if (count === 1)
. Это: a) причина, по которой я работаю изнутри оконных наблюдений в map()
и b) важна, потому что это необходимое поведение.
Пример: maxElements: 100, maxWait: 100. 101 Элементы выдвинуты при t = 99. Ожидаемое поведение: при t = 99 выдвигается Observable с 100 элементами. 1 Элемент оставлен. Сброс счетчика + таймера. При t = 199 счетчик для второго «куска» истекает и нажимает Observable на 1 элемент.
(В этом примере Brandons (см ответа код) будет - если я прочитал это правильно - нажать наблюдаемым при Т = 99 с 100 элементами и один мс позже, при Т = 100, наблюдаемым с одним элементом.)
Прохладный. Благодарю. Выглядит хорошо (и я сам глуп :-() Я постараюсь это завтра. Мой подход был немного сложнее ... – RAlfoeldi
Хммм ... хорошо выглядит, но не делает этого. 'publish() 'возвращает Наблюдаемый, что ожидается Наблюдаемый <Наблюдаемый >. То, как я читаю ваш код, таймер запускается независимо (каждый раз maxWait ms независимо от того, что происходит в потоке). Именно по этой причине я пытаюсь получить обратную связь внутри «windowWhen()» Observable - я начинаю считать, когда приходят первые элементы. (См. Op edit.) –
RAlfoeldi
Эта перегрузка публикации - это публикация (func: (Observable) => Наблюдаемый ): Наблюдаемый '. В других слова publish() возвращает любой тип наблюдаемого фабричного метода, который вы возвращаете. В этом случае я возвращаю Observable >. –
Brandon