2015-09-11 8 views
1

Я создаю интерактивную веб-страницу с RxJs.Как я могу превратить наблюдаемый в наблюдаемые длинные наблюдательные наблюдения, которые заканчиваются на определенную ценность?

Это то, что я хочу добиться:

У меня есть приложение, которое генерирует маркеры. Эти токены могут потребляться внешним объектом.

Когда пользователь создает токен, страница начинает опрос веб-сервера по его статусу (потребляется или нет). Когда токен потребляется, страница обновляется.

Итак, когда маркер создан, каждые 2 секунды на сервер отправляется запрос о том, потребляется ли токен.

У меня есть Observable строк, которые представляют собой мои generatedTokens.

У меня уже есть рабочая реализация с использованием класса Rx.Scheduler.default, который позволяет мне делать что-то вручную. Тем не менее, я не могу не чувствовать, что должно быть намного проще и элегантнее.

Это текущий код:

class TokenStore { 
    constructor(tokenService, scheduler) { 
    // actual implementation omitted for clarity 
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]); 

    this.consumedTokens = this.generatedTokens 
     .flatMap(token => 
     Rx.Observable.create(function(observer) { 
      var notify = function() { 
      observer.onNext(token); 
      observer.onCompleted(); 
      }; 
      var poll = function() { 
      scheduler.scheduleWithRelative(2000, function() { 
       // tokenService.isTokenConsumed returns a promise that resolves with a boolean 
       tokenService.isTokenConsumed(token) 
        .then(isConsumed => isConsumed ? notify() : poll()); 
       } 
      ); 
      }; 
      poll(); 
     })); 
    } 
} 

Есть ли что-то вроде метода "RepeatUntil"? Я ищу для реализации, что делает то же самое, что и выше код, но выглядит примерно так:

class TokenStore { 
    constructor(tokenService, scheduler) { 
    // actual implementation omitted for clarity 
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]); 

    this.consumedTokens = this.generatedTokens 
     .flatMap(token => 
     Rx.Observable.fromPromise(tokenService.isTokenConsumed(token)) 
        .delay(2000, scheduler) 
         // is this possible? 
        .repeatUntil(isConsumed => isConsumed === true)); 
    } 
} 
+0

Просто интересно, почему вы применяете длительный опрос. Почему бы не использовать node.js с socket.io или что-то еще? –

+1

На самом деле мы начали с SignalR (мы работаем на сервере .NET), но так как мы столкнулись с некоторыми проблемами сложности и проблемами с сетью (наши проекты обычно развертываются внутри DMZ), мы прибегаем к длительному опросу, чтобы уменьшить сложность и улучшить тестируемость. SignalR также был серьезным препятствием для теста интеграции правильно, и в итоге просто не стоило этого. – Moeri

ответ

3

Funnily достаточно ответ ударил меня через несколько минут после размещения вопроса. Я полагаю, что резиновая резина не может быть такой глупой.

Во всяком случае, ответ состоял из двух частей:

  • RepeatUntil может быть достигнут с помощью комбинации repeat(), filter() и first()

  • fromPromise имеет некоторый внутренний ленивый механизм кэширования, который вызывает последующие подписки на Не запускайте новый запрос AJAX. Поэтому мне пришлось прибегнуть назад к использованию Rx.Observable.create

Решение:

class TokenStore { 
    constructor(tokenService, scheduler) { 
    // actual implementation omitted for clarity 
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]); 

    this.consumedTokens = this.generatedTokens 
     .flatMap(token => 
     // cannot use fromPromise because it caches the results on subsequent subscriptions 
     Rx.Observable.create(observer => { 
      tokenService.isTokenConsumed(token) 
      .then(isConsumed => { 
       observer.onNext(isConsumed); 
       observer.onCompleted(); 
      }, error => { 
       observer.onError(error); 
      }) 
     }) 
     .delay(2000, scheduler) 
     .repeat() 
     .filter(isConsumed => isConsumed === true) 
     .first()) 
    .share(); 
    } 
} 

Несовершеннолетний Sidenote: на «долю()» гарантирует, что оба наблюдаемых горячи, что позволяет избежать сценария, где каждый абонент приведет к тому, что запрос ajax начнет стрельбу.

+0

Обратите внимание, что первый 'share()' в этом примере ничего не делает. Реально, я понимаю, что вы просто издеваетесь над этим, но использование его с 'just' практически одинаково и использует только' just', но с большим количеством накладных расходов. – paulpdaniels

+0

А хорошо поймать, я редактировал свой пример кода, чтобы избежать путаницы. – Moeri

0
class TokenSource { 
    constructor(tokenService, scheduler) { 
    this.generatedTokens = Rx.Observable.just(["token1", "token2"]).share(); 

    this.consumedTokens = this.generatedTokens 
     .flatMap(token => 
     Rx.Observable.interval(2000, scheduler) 
       .flatMap(Rx.Observable.defer(() => 
          tokenService.isTokenConsumed(token))) 
       .first(isConsumed => isConsumed === true)) 
     .share() 

    } 
} 

Вы можете воспользоваться двумя фактами:

  1. flatMap имеет перегрузку, которая принимает наблюдаемым, который необходимо выполнить повторную всякий раз, новое событие приходит в

  2. defer может принять метод, возвращающий обещание. Этот метод будет повторно выполнен для каждой подписки, а это значит, что вам не нужно сворачивать свой собственный код Promise ->Observable.

+0

Я исправляю, заявляя, что это решение будет запускать запрос каждые 2 секунды до тех пор, пока isConsumed не верен? Это отличается от того, что у меня было, что только вызвало другой запрос, когда предыдущий закончил. Спасибо за интересное решение! – Moeri

 Смежные вопросы

  • Нет связанных вопросов^_^