2017-02-16 8 views
0

Существует PublishProcessor, который вызывает оператор .doOnSubscribe(checkCondition) для выполнения определенной проверки. checkCondition предназначен для выброса UnsupportedOperationException, который затем должен быть передан подписчику вниз по потоку до onError(). Вместо этого вызывается UndeliverableException и происходит сбой процесса.Исключение не распространяется на onError() при броске из doOnSubscribe()

publishProcessor 
    .filter(() -> { // predicate }) 
    .observeOn(scheduler) 
    .doOnSubscribe(checkCondition) 
    .to((sourceFlowable) -> new FancyFlowable(sourceFlowable))) 
    .safeSubscribe(subscriber); 

У кого-нибудь есть идеи, что здесь происходит не так? Почему исключение, вызванное checkCondition, не распространяется на onError абонента?

+0

Наблюдаемого поведение связанно с ошибкой. Исправьте [# 5103] (https://github.com/ReactiveX/RxJava/pull/5103). – akarnokd

ответ

0

Возможно, потому что это не имеет смысла; Выполнение doOnSubscribe выполняется до того, как наблюдаемая цепочка была правильно инициализирована. Вы считали что-то вроде этого?

if(checkCondition) { 
    Observable.error(...).safeSubscribe(subscriber); 
} 

И затем продолжить с исходным кодом (sans doOnSubscribe).

+0

Условие может измениться после того, как Observable был подготовлен и до того, как абонент подписывается, поэтому проверка состояния аванса не решит проблему. –

+1

Оооо, я знаю! 'Observable.defer (() -> checkCondition? Error (...): publishProcessor)' и нормально работает. –

1

Это ошибка с оператором RxJava 2's doOnSubscribe. Если сбой обратного вызова onSubscribe, Throwable в этой точке может быть запрограммирован через onError на законных основаниях. Я скоро опубликую исправление.

В настоящем время, this comment может быть обходным путем для вас:

Observable.defer(() -> checkCondition ? error(...) : publishProcessor)