2017-02-04 7 views
0

У меня есть следующая проблема: у меня есть тема, которая использует метод OnNext для сигнализации новых событий. Однако может возникнуть исключение, которое имитируется в коде ниже с вызовом метода OnError. Причина ошибки зависит от времени, поэтому, если я попытаюсь повторить одно и то же действие снова несколько раз, это будет успешным. Поэтому я хочу использовать метод catch, который принимает функцию, генерирующую новые последовательности, чтобы повторить действие, скажем, 5 раз. Если всякая повторная попытка завершается неудачей, тогда она должна выдать окончательное исключение, и если хотя бы один из них будет успешным, он должен продолжить работу с OnNext (5) и OnNext (6).Как повторить действие OnError reactive X IObservable несколько раз, и если успешно продолжить

В идеале исключение и последующий вызов OnError следует предотвращать, но это не в моем случае.

Я пробовал несколько сценариев, используя методы Retry, Concat, Catch и т. Д., Но ничего не работало так, как я хотел.

Subject<int> sub = new Subject<int>(); 

var seq = sub.Select(x => 
{ 
    //time dependent operation 
    Console.WriteLine(x); 
    return x; 
}). 
Catch((SeqException<int> ex) => 
{ 
    return Observable.Empty(0); // what sequece to return to achieve the desired behaviour 
}); 
seq.Subscribe(); 

sub.OnNext(1); 
sub.OnNext(2); 
sub.OnNext(3); 
sub.OnError(new SeqException<int>{ Value = 4}); 
sub.OnNext(5); 
sub.OnNext(6); 

seq.Wait(); 

Заранее спасибо.

ответ

1

Дело 1: Если операция, зависящая от времени, находится внутри Select, это то, что вам нужно повторить.

Используйте Observable.Start, чтобы преобразовать thunk в наблюдаемый. Затем вы можете использовать все операторы восстановления для объявления вашего поведения.

Subject<int> sub = new Subject<int>();  

var seq = sub.SelectMany(x =>    
     Observable.Start(() => 
     { 
      //time dependent failure 
      if (DateTime.Now.Second % 2 == 0) 
       throw new Exception(); 

      Console.WriteLine(x); 
      return x; 
     }) 
     .Retry(5) 
     .Catch(Observable.Return(x)) 
    ); 

//for testing 
Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => (int)x).Subscribe(sub); 

seq.Wait(); 

Случай 2: Вы на самом деле получить сигнал через Subject.

Контракт Rx требует, чтобы после OnComplete | OnError не было больше уведомлений. И, получив эту ошибку, субъект располагает всеми своими подписками, и трубопровод срывается. Вы должны привести его к делу 1, чтобы он работал.

+0

привет, ваш пример довольно интересен, хотя ваш случай 2 к сожалению, правильно, поэтому мне придется подумать о том, как решить эту проблему. Если больше ответов не появится, я соглашусь с этим, спасибо –

+0

Если сторонний код принимает 'ISubject', возможно, вы можете сделать один раз, который, наконец, испустит« Уведомление ». – Asti

1

Вы всегда можете обернуть что-либо блоком повторных попыток. Просто пример для действия с одним аргументом.

Action<T> RetryAction<T>(Action<T> action, int retries) 
{ 
    return arg => { 
     int count = 0; 
     while (true) 
     { 
      try 
      { 
       action(arg); 
       return; 
      } 
      catch 
      { 
       if (count == retries) 
        throw; 
       count++; 
      } 
     } 
    } 
} 

Поэтому вам не нужно менять логику обработки последовательности.

+0

проблема заключается в том, что исключение всегда будет сигнализироваться с помощью метода OnError, а затем распространено на мою часть кода. У меня нет контроля над вызовом OnError. Я могу решить только обработку, так что этот пример не решает мои проблемы unfortunatelly –

+0

Ну, исключение вызывается после n повторений, а затем вы получите, что с OnError –