Я использую RxJava в асинхронной передачи сообщений среде (vert.x), и поэтому имеют поток, который выглядит следующим образом:retryWhen с таймером появляется ниспровергать поведение на Merge
Observable.defer(() -> getEndpoint())
.mergeWith(getCancellationMessage())
.flatMap(endpoint -> useEndpoint(endpoint))
.retryWhen(obs -> obs.flatMap(error -> {
if (wasCancelled(error)) {
return Observable.error(error);
}
return Observable.timer(/* args */)
}))
.subscribe(result -> useResult(result),
error -> handleError(error)
);
Реализация getCancellationMessage()
возвращает наблюдаемый поток, который испускает ошибку всякий раз, когда сообщение отмены было получено от независимого источника сообщения. Этот поток никогда не испускает ничего, кроме Observable.error()
, и он испускает ошибку только при получении сообщения об отмене.
Если я понимаю, как работает merge
, вся цепь должна быть завершена через onError
, когда getCancellationMessage()
испускает ошибку.
Однако, я считаю, что если оператор retryWhen
ожидает, что таймер будет излучать при получении сообщения об отказе, ошибка будет проигнорирована, а цикл retryWhen
будет продолжаться, как если бы аннулирование не было принято.
Я могу исправить это поведение, объединив Observable.timer()
с той же функцией getCancellationMessage()
, но я не понимаю, почему я должен это делать в первую очередь.
Это merge
/retryWhen
Ожидаемое взаимодействие?
редактирует:
Ниже приведен пример такого рода вещи, что функция getCancellationMessage()
делает:
Observable<T> getCancellationMessage() {
if (this.messageStream == null) {
this.messageStream = this.messageConsumer.toObservable()
.flatMap(message -> {
this.messageConsumer.unregister();
if (isCancelMessage(message)) {
return Observable.error(new CancelError());
}
else {
return Observable.error(new FatalError());
}
});
}
return this.messageStream;
}
Следует заметить, что я не являюсь владельцем реализации this.messageConsumer
- это вытекает из сторонняя библиотека, которую я использую (vert.x), поэтому я не контролирую реализацию этого Observable.
Как я понимаю, метод messageConsumer.toObservable()
возвращает результат Observable.create()
снабженным экземпляром this class, который будет вызывать метод onNext
абонента всякий раз, когда новое сообщение has arrived.
Звонок в messageConsumer.unregister()
предотвращает получение дальнейших сообщений.
Чтение документации, в нем не упоминается enywhere, что 'Observable.timer' вызывает' onError' вообще. И поскольку 'onError' не вызывается таймером, дочерняя подписка также не вызывает' onError', поэтому игнорирует исключения. Честно говоря, мне не хватает глубоких знаний, чтобы ответить на это с уверенностью, но вы пробовали, что предлагает официальная документация [здесь] (http://reactivex.io/RxJava/javadoc/rx/Observable.html#retryWhen (rx .functions.Func1)) –
Я не ожидаю 'Observable.timer()' для создания ошибки, но в то время как 'timer' работает, абсолютно верно, что' getCancellationMessage' выпустил ошибку, и я удивляясь, почему это игнорируется. Даже после истечения таймера, как будто 'getCancellationMessage' никогда не выдавал' Observable.error() '. В документации 'retryWhen' нет ничего, что я нашел бы, чтобы объяснить, что я вижу. – Hoobajoob
Но, глядя на график в url, который я опубликовал раньше, если наблюдаемый из 'retryWith' не вызывает' onError', то в результате наблюдаемый продолжает цикл, который звучит как то, что вы испытываете. –