2017-01-05 13 views
2

Я новичок в Rx, и я очень благодарен за помощь в обработке ошибок. У меня есть следующий код, который является в основном машинописным:RxJS - Продолжить отправку уведомлений после возникновения ошибки в onNext

var observable = Rx.Observable.fromEvent(searchField, 'keyup') 
    .map(ev => ev.target.value) 
    .filter(text => text.length > 2) 
    .debounce(500 /* ms */) 
    .distinctUntilChanged() 
    .flatMapLatest(getAsyncSearchResults); 

observable.subscribe(function(value) { 
     console.log("onNext"); 
     throw "error"; // THIS TERMINATES THE OBSERVABLE 
    }, 
    function(error) { 
     console.log("Error"); 
    }, 
    function() { 
     console.log("Completed"); 
    }); 

Наблюдаемыми завершается после того, как исключение происходит в функции onNext. Это кажется мне нежелательным поведением, так как я не хочу обертывать весь свой код onNext внутри try-catch. Есть ли способ, которым я мог бы сказать наблюдаемому продолжать выдавать уведомление независимо от того, какое исключение имеет место?

+1

Я не знаю вашу цели для метания ошибки в методе подписки, но это очень не-RxJS-y способ делать что-то. Я бы пересмотрел ваш шаблон и включил обработку реактивной ошибки в конвейер. –

ответ

0

Это поведение Rx на каждой платформе, и единственный способ поймать исключения в наблюдателях - это ... хорошо ... поймать их.

Вы можете создать свой собственный метод subscribeSafe, который обертывает обратный вызов в try/catch, если у вас часто возникает эта необходимость.

В качестве альтернативы, если вы можете переместить часть (ы), которые бросают от наблюдателя к самому потоку, тогда вы можете контролировать поток ошибок и (например) повторить попытку.

Пример ниже:

// simulates (hot) keyup event 
 
const obs1 = Rx.Observable.interval(1000).publish() 
 
obs1.connect() 
 

 
function getAsyncSearchResults(i) { 
 
    return Rx.Observable.timer(500).mapTo(i) 
 
} 
 

 
obs1.switchMap(getAsyncSearchResults).do(x => { 
 
    console.log(x) // all events gets logged 
 
    throw new Error() 
 
}).retry().subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>

3

Вы должны действительно сделать ваши next, completed и error обработчики безопасны для исключений. Не существует разумного способа, чтобы поток источника «возобновил» подписку, когда ошибки подписчика - единственный разумный способ действий - прекратить разговор с подписчиком.

Для полного обсуждения того, почему это мой ответ How to handle exceptions thrown by observer's onNext in RxJava? - который применим точно также и к любой платформе реализации Rx.

Futhermore см Why is the OnError callback never called when throwing from the given subscriber? хорошей пояснительная метафоры (газетный киоск) соображений - в частности, о необходимости рассмотрения может быть другими абонентами, которые не бросили ошибку в своих next обработчиках.

0

Вам необходимо переключиться на новый одноразовый поток, и если в нем возникнет ошибка, он будет безопасно удален и сохранит исходный поток.

Я изменил ваш код в более простой поток 0 - 5, чтобы продемонстрировать легко. Он выдает ошибку/исключение, если число 3.

Rx.Observable.from([0,1,2,3,4,5]) 
 
    .switchMap(value => { 
 

 
     // This is the disposable stream! 
 
     // Errors can safely occur in here without killing the original stream 
 

 
     return Rx.Observable.of(value) 
 
      .map(value => { 
 
       if (value === 3) { 
 
        throw new Error('Value cannot be 3'); 
 
       } 
 
       return value; 
 
      }) 
 
      .catch(error => { 
 
       // You can do some fancy stuff here with errors if you like 
 
       // Below we are just returning the error object to the outer stream 
 
       return Rx.Observable.of(error); 
 
      }); 
 

 
    }) 
 
    .map(value => { 
 
     if (value instanceof Error) { 
 
      // Maybe do some error handling here 
 
      return `Error: ${value.message}`; 
 
     } 
 
     return value; 
 
    }) 
 
    .subscribe(
 
     (x => console.log('Success', x)), 
 
     (x => console.log('Error', x)), 
 
     (() => console.log('Complete')) 
 
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>

Более подробная информация об этой технике в этом блоге: The Quest for Meatballs: Continue RxJS Streams When Errors Occur