2017-01-17 11 views
6

Давайте на минуту следующего кодаRx distinctUntilChanged позволяет повторение через настраиваемое время между событиями

Rx.Observable.merge(
    Rx.Observable.just(1), 
    Rx.Observable.just(1).delay(1000) 
).distinctUntilChanged() 
    .subscribe(x => console.log(x)) 

Мы ожидаем, что 1 регистрируются только один раз. Однако что, если мы хотим разрешить повторение значения, если его последняя эмиссия была конфигурируемой величиной времени назад? Я имею в виду, чтобы получил оба события в журнале.

Например, было бы здорово иметь что-то вроде следующего

Rx.Observable.merge(
    Rx.Observable.just(1), 
    Rx.Observable.just(1).delay(1000) 
).distinctUntilChanged(1000) 
    .subscribe(x => console.log(x)) 

В какой distinctUntilChanged() принимает какой-то тайм-аут, чтобы позволить повторения на следующий элемент. Однако такой вещи не существует, и мне было интересно, знает ли кто-нибудь изящный способ добиться этого, используя высокоуровневые операторы, не спускаясь с фильтром, который требует обработки состояния.

ответ

8

Если я не имею в непонимании Я довольно уверен, что это может быть достигнуто в относительно прямой вперед образом с windowTime:

Observable 
    .merge(
    Observable.of(1), 
    Observable.of(1).delay(250), // Ignored 
    Observable.of(1).delay(700), // Ignored 
    Observable.of(1).delay(2000), 
    Observable.of(1).delay(2200), //Ignored 
    Observable.of(2).delay(2300) 
) 
    // Converts the stream into a stream of streams each 1000 milliseconds long 
    .windowTime(1000) 
    // Flatten each of the streams and emit only the latest (there should only be one active 
    // at a time anyway 
    // We apply the distinctUntilChanged to the windows before flattening 
    .switchMap(source => source.distinctUntilChanged()) 
    .timeInterval() 
    .subscribe(
    value => console.log(value), 
    error => console.log('error: ' + error), 
    () => console.log('complete') 
); 

Смотрите пример here (заимствованный @ например, входы Мартина)

+0

Удивительный, очень умное решение проблемы - должен быть принятый ответ! – olsn

+1

Не знал о операторе окна. Я реализовал что-то вроде этого сложным способом, но это действительно упрощает его. Благодаря! Хотя я заметил, что это может испускать повторяющиеся элементы, если они происходят вблизи границы окна (например, 990 мс и 1010 мс). Я изменил его, чтобы сделать тайм-аут границы относительно последнего отдельного элемента: 'obs.window (obs.distinctUntilChanged(). SwitchMap (s => Observable.timer (1000))). См. Измененный [пример] (https://jsbin.com/soyovumeja/edit?js, консоль). –

2

Это интересный прецедент. Интересно, есть ли простое решение, чем моя (обратите внимание, что я использую RxJS 5):

let timedDistinctUntil = Observable.defer(() => { 
    let innerObs = null; 
    let innerSubject = null; 
    let delaySub = null; 

    function tearDown() { 
     delaySub.unsubscribe(); 
     innerSubject.complete(); 
    } 

    return Observable 
     .merge(
      Observable.of(1), 
      Observable.of(1).delay(250), // ignored 
      Observable.of(1).delay(700), // ignored 
      Observable.of(1).delay(2000), 
      Observable.of(1).delay(2200), // ignored 
      Observable.of(2).delay(2300) 
     ) 
     .do(undefined, undefined,() => tearDown()) 
     .map(value => { 
      if (innerObs) { 
       innerSubject.next(value); 
       return null; 
      } 

      innerSubject = new BehaviorSubject(value); 

      delaySub = Observable.of(null).delay(1000).subscribe(() => { 
       innerObs = null; 
      }); 

      innerObs = innerSubject.distinctUntilChanged(); 
      return innerObs; 
     }) 
     // filter out all skipped Observable emissions 
     .filter(observable => observable) 
     .switch(); 
}); 

timedDistinctUntil 
    .timestamp() 
    .subscribe(
     value => console.log(value), 
     error => console.log('error: ' + error), 
     () => console.log('complete') 
    ); 

Смотреть демо: https://jsbin.com/sivuxo/5/edit?js,console

Вся логика заворачивают в Observable.defer() статический метод, поскольку он требует некоторых дополнительных переменные.

Пара очков, как это все работает:

  1. merge() является источником элементов.

  2. Я использую do(), чтобы правильно поймать, когда источник завершен, поэтому я могу отключить внутренний таймер и отправить полное уведомление.

  3. Оператор map() - это место, где происходят самые интересные вещи. Я возвращаю значение, которое он получил, а затем возвращает null, если уже существует действующий Observable (он был создан менее 1000 мс назад = innerObs != null). Затем я в конечном итоге создаю новый предмет, в котором я собираюсь переименовать все предметы и вернуть это BehaviorSubject, связанное с .distinctUntilChanged(). В конце я планирую задержку 1 с для установки innerObs = null, что означает, что тогда, когда придет другое значение, она вернет новый Observable с новым .distinctUntilChanged().

  4. Тогда filter() позволит мне игнорировать все возвращаемые значения null. Это означает, что он не будет генерировать новый Наблюдаемый более одного раза в секунду.

  5. Теперь мне нужно работать с так называемым высшим порядком Наблюдаемым (Наблюдаемый излучающим наблюдаемым. По этой причине я использую switch() оператор, который всегда подписывается только новейшим Observable, излучаемый источником. В нашем случае мы генерируем Наблюдаемый только макс один раз в секунду (благодаря используемому выше filter()), и это внутреннее само Наблюдение может испускать столько значений, которые ему нужны, и все они пройдут через distinctUntilChanged(), поэтому дубликаты игнорируются.

Выход для этой демонстрации будет выглядеть следующий вывод:

Timestamp { value: 1, timestamp: 1484670434528 } 
Timestamp { value: 1, timestamp: 1484670436475 } 
Timestamp { value: 2, timestamp: 1484670436577 } 
complete 

Как вы можете видеть значение 1 излучается дважды с задержкой CCA 2s. Однако значение 2 прошло без проблем после 100 мс благодаря distinctUntilChanged().

Я знаю, что это не просто, но я надеюсь, что это имеет смысл к вам :)

+0

Хорошего ответа - мне понравился этот USECASE, а также - может быть даже стоит создать собственный оператор - я мог себе представить, все больше людей будут иметь этот вопрос. – olsn