2016-08-29 2 views
2

Я пытаюсь связать вызовы с сервером максимум до maxEntries, но не хочу ждать дольше, чем maxWait ms. Это имело обыкновение быть доступным как windowWithTimeOrCount() в RxJS 4, но было удалено из RxJS 5.RxJS - using windowWhen() для maxWait и maxElements windows

Все работает отлично, за исключением того, что последний элемент окна потерян. И говоря о «потерянном» - вот как я себя чувствую сейчас. Любой Гуру RxJS, который мог бы рассказать мне, что я делаю неправильно?

private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> { 

    // We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries 
    // but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that 
    // complete after maxEntries/maxWait (whatever comes first). 
    const toggleSubject = new Subject<void>(); 

    return queue 

    // Start emitting a new Observable every time toggleSubject emits. 
    // (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the 
    // complete collection) 
     .windowWhen(() => toggleSubject) 

     // map() is called once for every window (maxEntries/maxWait) 
     // the inner do() is called for every element in the window, allowing us to set up the timeout callback and to 
     // count all elements, then emitting on toggleSubject, triggering a new Observable. 
     // (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed 
     // and the hooked up version with the inner do() would never be called.) 
     .map((obs) => { 
     // counts the number of cacheEntries already in this stream 
     let count = 0; 
     // flag to kill the timeout callback 
     let done = false; 
     // we have to return an Observable 
     return obs.do(() => { 
      count++; 
      if (count === 1) { 
       // we start counting when the first element is streamed. 
       IntervalObservable.create(maxWait).first().subscribe(() => { 
       if (!done) { 
        //trigger due to maxWait 
        toggleSubject.next(null); 
       } 
       }); 
      } 
      if (count > (maxEntries)) { 
       done = true; 
       // trigger due due to maxEntries(' + maxEntries + ')'); 
       toggleSubject.next(null); 
      } 
      } 
     ); 
     }); 
    } 

элемент, который вызывает toggleSubject.next(null) вследствие if (count > (maxEntries)) теряется (не в любом окне).

EDIT: maxTime начинает тикать в момент нажатия первого элемента нового наблюдаемого. if (count === 1). Это: a) причина, по которой я работаю изнутри оконных наблюдений в map() и b) важна, потому что это необходимое поведение.

Пример: maxElements: 100, maxWait: 100. 101 Элементы выдвинуты при t = 99. Ожидаемое поведение: при t = 99 выдвигается Observable с 100 элементами. 1 Элемент оставлен. Сброс счетчика + таймера. При t = 199 счетчик для второго «куска» истекает и нажимает Observable на 1 элемент.

(В этом примере Brandons (см ответа код) будет - если я прочитал это правильно - нажать наблюдаемым при Т = 99 с 100 элементами и один мс позже, при Т = 100, наблюдаемым с одним элементом.)

ответ

2

Да, вы не хотите использовать map для таких побочных эффектов. Как вы заметили, вы в конечном итоге бросаете предметы.

Вот общий метод, который, я думаю, будет делать то, что вы хотите.

Примечание: RXJS 5 в настоящее время имеет issue с определением типа для этой перегрузки публикации. Я добавил некоторые приведения типов, которые должны позволить компилировать в TypeScript.

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> { 
    // use publish() so that we can subscribe multiple times to the same stream of data. 
    return queue.publish(entries => { 
     // observable which will trigger after maxWait 
     const timer = IntervalObservable.create(maxWait); 
     // observable which will trigger after maxEntries 
     const limit = entries.take(maxEntries).last(); 
     // observable which will trigger on either condition 
     const endOfWindow = limit.takeUntil(timer); 

     // use endOfWindow to close each window. 
     return entries.windowWhen(() => endOfWindow) as Observable<T>; 
    }) as Observable<Observable<T>>; 
} 

Edit:

Если вы не хотите, таймер, чтобы начать только после первой детали приезжает в каждом окне, то вы можете сделать это так:

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> { 
    // use publish() so that we can subscribe multiple times to the same stream of data. 
    return queue.publish(entries => { 
     // observable which will trigger after maxWait after the first 
     // item in this window arrives: 
     const timer = entries.take(1).delay(maxWait); 
     // observable which will trigger after maxEntries 
     const limit = entries.take(maxEntries).last(); 
     // observable which will trigger on either condition 
     const endOfWindow = limit.takeUntil(timer); 

     // use endOfWindow to close each window. 
     return entries.windowWhen(() => endOfWindow) as Observable<T>; 
    }) as Observable<Observable<T>>; 
} 
+0

Прохладный. Благодарю. Выглядит хорошо (и я сам глуп :-() Я постараюсь это завтра. Мой подход был немного сложнее ... – RAlfoeldi

+0

Хммм ... хорошо выглядит, но не делает этого. 'publish() 'возвращает Наблюдаемый , что ожидается Наблюдаемый <Наблюдаемый >. То, как я читаю ваш код, таймер запускается независимо (каждый раз maxWait ms независимо от того, что происходит в потоке). Именно по этой причине я пытаюсь получить обратную связь внутри «windowWhen()» Observable - я начинаю считать, когда приходят первые элементы. (См. Op edit.) – RAlfoeldi

+0

Эта перегрузка публикации - это публикация (func: (Observable ) => Наблюдаемый ): Наблюдаемый '. В других слова publish() возвращает любой тип наблюдаемого фабричного метода, который вы возвращаете. В этом случае я возвращаю Observable >. – Brandon

0

Решения Я придумал переключение windowWhen() в планировщик async.

if (count === (maxEntries)) { 
    done = true; 
    this.LOGGER.debug(' - trigger due due to maxEntries(' + maxEntries + ')'); 
    Rx.Scheduler.async.schedule(()=>toggleSubject.next(null)); 
} 

Проблема заключалась в том, что windowWhen() завершена возвращенные наблюдаемыми немедленно - предотвращение каких-либо вниз по течению операторов от приема, что последнее значение.

Извините за вопрос (и ответ). Я пробовал Rx.Scheduler.async и т. Д., Прежде чем отправлять здесь, но почему-то это не сработало.

+0

Обратите внимание, что ваши окна могут содержать больше, чем 'maxEntries', если элементы вставляются в наблюдаемые синхронно, так как все они будут проталкиваться через текущее окно перед выполнением асинхронного вызова. Сложности, связанные с написанием таких операций, являются одной из причин, по которым общий совет заключается в том, чтобы * не * использовать Субъекты и вместо этого строить свой оператор из существующих операторов. – Brandon