2017-01-31 2 views
0

Я использую RxJava в асинхронной передачи сообщений среде (vert.x), и поэтому имеют поток, который выглядит следующим образом:retryWhen с таймером появляется ниспровергать поведение на Merge

Observable.defer(() -> getEndpoint()) 
      .mergeWith(getCancellationMessage()) 
      .flatMap(endpoint -> useEndpoint(endpoint)) 
      .retryWhen(obs -> obs.flatMap(error -> { 
       if (wasCancelled(error)) { 
        return Observable.error(error); 
       } 
       return Observable.timer(/* args */) 
      })) 
      .subscribe(result -> useResult(result), 
         error -> handleError(error) 
     ); 

Реализация getCancellationMessage() возвращает наблюдаемый поток, который испускает ошибку всякий раз, когда сообщение отмены было получено от независимого источника сообщения. Этот поток никогда не испускает ничего, кроме Observable.error(), и он испускает ошибку только при получении сообщения об отмене.

Если я понимаю, как работает merge, вся цепь должна быть завершена через onError, когда getCancellationMessage() испускает ошибку.

Однако, я считаю, что если оператор retryWhen ожидает, что таймер будет излучать при получении сообщения об отказе, ошибка будет проигнорирована, а цикл retryWhen будет продолжаться, как если бы аннулирование не было принято.

Я могу исправить это поведение, объединив Observable.timer() с той же функцией getCancellationMessage(), но я не понимаю, почему я должен это делать в первую очередь.

Это merge/retryWhen Ожидаемое взаимодействие?

редактирует:

Ниже приведен пример такого рода вещи, что функция getCancellationMessage() делает:

Observable<T> getCancellationMessage() { 
    if (this.messageStream == null) { 
     this.messageStream = this.messageConsumer.toObservable() 
          .flatMap(message -> { 
           this.messageConsumer.unregister(); 
           if (isCancelMessage(message)) { 
            return Observable.error(new CancelError()); 
           } 
           else { 
            return Observable.error(new FatalError()); 
           } 
          }); 
    } 
    return this.messageStream; 
} 

Следует заметить, что я не являюсь владельцем реализации this.messageConsumer - это вытекает из сторонняя библиотека, которую я использую (vert.x), поэтому я не контролирую реализацию этого Observable.

Как я понимаю, метод messageConsumer.toObservable() возвращает результат Observable.create() снабженным экземпляром this class, который будет вызывать метод onNext абонента всякий раз, когда новое сообщение has arrived.

Звонок в messageConsumer.unregister() предотвращает получение дальнейших сообщений.

+1

Чтение документации, в нем не упоминается enywhere, что 'Observable.timer' вызывает' onError' вообще. И поскольку 'onError' не вызывается таймером, дочерняя подписка также не вызывает' onError', поэтому игнорирует исключения. Честно говоря, мне не хватает глубоких знаний, чтобы ответить на это с уверенностью, но вы пробовали, что предлагает официальная документация [здесь] (http://reactivex.io/RxJava/javadoc/rx/Observable.html#retryWhen (rx .functions.Func1)) –

+0

Я не ожидаю 'Observable.timer()' для создания ошибки, но в то время как 'timer' работает, абсолютно верно, что' getCancellationMessage' выпустил ошибку, и я удивляясь, почему это игнорируется. Даже после истечения таймера, как будто 'getCancellationMessage' никогда не выдавал' Observable.error() '. В документации 'retryWhen' нет ничего, что я нашел бы, чтобы объяснить, что я вижу. – Hoobajoob

+1

Но, глядя на график в url, который я опубликовал раньше, если наблюдаемый из 'retryWith' не вызывает' onError', то в результате наблюдаемый продолжает цикл, который звучит как то, что вы испытываете. –

ответ

2

Однако я нахожу, что если оператор retryWhen ждет таймера испускать при получении сообщения отмены, игнорируется ошибка, и цикл retryWhen продолжается, если отмена не был получен.

Оператор retryWhen оказывается выше по потоку Throwable в значение и направляет его через последовательность вы предоставили для того, чтобы получить ответ значение, чтобы повторить попытку вверх по течению или в конце потока, таким образом,

Observable.error(new IOException()) 
.retryWhen((Observable<Throwable> error) -> error) 
.subscribe(); 

Будет ли повторить неопределенно, потому что внутреннее значение error считается значением, а не исключением.

retryWhen не знает сам по себе, какая из error значений следует рассматривать как один, который не должен быть повторен, это работа вашего внутреннего потока:

Observable.defer(() -> getEndpoint()) 
    .mergeWith(getCancellationMessage()) 
    .flatMap(endpoint -> useEndpoint(endpoint)) 
    .retryWhen(obs -> obs 
      .takeWhile(error -> !(error instanceof CancellationException)) // <------- 
      .flatMap(error -> Observable.timer(/* args */)) 
) 
    .subscribe(result -> useResult(result), 
       error -> handleError(error) 
); 

Здесь мы позволяем только ошибка, если она не относится к типу CancellationException (вы можете заменить ее свой тип ошибки). Это завершит последовательность.

Если вы хотите, последовательность, чтобы закончить с ошибкой вместо этого, мы должны изменить логику flatMap вместо:

.retryWhen(obs -> obs 
     .flatMap(error -> { 
      if (error instanceof CancellationException) { 
       return Observable.error(error); 
      } 
      return Observable.timer(/* args */); 
     }) 
) 

Обратите внимание, что возвращение Observable.empty() в flatMap не заканчивается последовательность, как это просто указывает на источник чтобы быть объединенным, пуст, но могут быть и другие внутренние источники. В частности, до retryWhen, empty() будет вешать последовательность неопределенно, потому что не будет никакого сигнала, указывающего повтор или конец последовательности.

Edit:

на основе вашей редакции, я предполагаю, что getCancellationMessage() горячая наблюдаемым. Для наблюдения за событиями или ошибками необходимо наблюдать горячие наблюдаемые. Когда оператор retryWhen находится в своем льготном периоде повтора из-за timer(), ничто не подписано на верхний mergeWith с getCancellationMessage() и, таким образом, он не может остановить таймер в этой точке.

Вы должны держать подписку на него в то время как timer выполняет, чтобы остановить его сразу:

Observable<Object> cancel = getCancellationMessage(); 

Observable.defer(() -> getEndpoint()) 
    .mergeWith(cancel) 
    .flatMap(endpoint -> useEndpoint(endpoint)) 
    .retryWhen(obs -> obs 
     .flatMap(error -> { 
      if (error instanceof CancellationException) { 
       return Observable.error(error); 
      } 
      return Observable.timer(/* args */).takeUntil(cancel); 
     }) 
) 
    .subscribe(result -> useResult(result), 
       error -> handleError(error) 
); 

В этом случае, если cancel срабатывает, когда таймер выполняется, то retryWhen остановит таймер и прекратить с ошибкой отмены немедленно.

Использование takeUntil является одним из вариантов, как вы выяснили, mergeWith (cancel) снова работает.

+0

Благодарим вас за полезное объяснение. Я собираюсь применить ваши рекомендации и проверить, относятся ли они к такому поведению. – Hoobajoob

+0

К сожалению, это не изменяет поведение. Если 'retryWhen' столкнулся с ошибкой, которая заставила его вернуть' Observable.timer() ', то любые ошибки, созданные после этой точки, но до истечения срока действия таймера, игнорируются. 'retryWhen' даже не кажется, что вызывается с этой более поздней ошибкой. – Hoobajoob

+0

Я обновил образец кода соответственно. Все еще запутано. – Hoobajoob

 Смежные вопросы

  • Нет связанных вопросов^_^