2017-01-20 22 views
1

У меня есть метод расширения.Испытание модуля реактивного метода расширения с таймером задержки

public static IObservable<T> RetryWithCount<T>(this IObservable<T> source, 
      int retryCount, int delayMillisecondsToRetry, IScheduler executeScheduler = null, 
      IScheduler retryScheduler = null) 
     { 
      var retryAgain = retryCount + 1; 
      return source 
       .RetryX(
        (retry, exception) => 
         retry == retryAgain 
          ? Observable.Throw<bool>(exception) 
          : Observable.Timer(TimeSpan.FromMilliseconds(delayMillisecondsToRetry)) 
           .Select(_ => true)); 
     } 

RetryX еще один метод расширения и я могу модульное тестирование отлично. Проблема с указанным выше методом заключается в том, что я возвращаю Observable.Timer, и это утверждение вызывается во второй раз.

Метод испытания единицы измерения.

 [Test] 
     public void should_retry_with_count() 
     { 
      // Arrange 
      var tries = 0; 
      var scheduler = new TestScheduler(); 
      IObservable<Unit> source = Observable.Defer(() => 
      { 
       ++tries; 
       return Observable.Throw<Unit>(new Exception()); 
      }); 

      // Act 
      var subscription = source.RetryWithCount(1, 100, scheduler, scheduler) 
       .Subscribe(
        _ => { }, 
        ex => { }); 
      scheduler.AdvanceByMinimal(); //How to make sure that it is completed? 

      // Assert 
      Assert.IsTrue(tries == 2); // Assert is invoked before the source has completed. 
     } 

Метод помощи AdvanceByMinimal.

public static void AdvanceMinimal(this TestScheduler @this) => @this.AdvanceBy(TimeSpan.FromMilliseconds(1)); 

Успешный модульный тест для метода продления RetryX приведен ниже.

 [Test] 
     public void should_retry_once() 
     { 
      // Arrange 
      var tries = 0; 
      var scheduler = new TestScheduler(); 
      var source = Observable 
       .Defer(
        () => 
        { 
         ++tries; 
         return Observable.Throw<Unit>(new Exception()); 
        }); 
      var retryAgain = 2; 

      // Act 
      source.RetryX(
       (retry, exception) => 
       { 
        var a = retry == retryAgain 
         ? Observable.Return(false) 
         : Observable.Return(true); 

        return a; 
       }, scheduler, scheduler) 
       .Subscribe(
        _ => { }, 
        ex => { }); 
      scheduler.AdvanceMinimal(); 

      // Assert 
      Assert.IsTrue(tries == retryAgain); 
     } 

И для ясности общего рисунка ниже применяется метод расширения RetryX.

 /// <summary> 
     /// Retry the source using a separate Observable to determine whether to retry again or not. 
     /// </summary> 
     /// <typeparam name="T"></typeparam> 
     /// <param name="source"></param> 
     /// <param name="retryObservable">The observable factory used to determine whether to retry again or not. Number of retries & exception provided as parameters</param> 
     /// <param name="executeScheduler">The scheduler to be used to observe the source on. If non specified MainThreadScheduler used</param> 
     /// <param name="retryScheduler">The scheduler to use for the retry to be observed on. If non specified MainThreadScheduler used.</param> 
     /// <returns></returns> 
     public static IObservable<T> RetryX<T>(this IObservable<T> source, 
      Func<int, Exception, IObservable<bool>> retryObservable, IScheduler executeScheduler = null, 
      IScheduler retryScheduler = null) 
     { 
      if (retryObservable == null) 
      { 
       throw new ArgumentNullException(nameof(retryObservable)); 
      } 

      if (executeScheduler == null) 
      { 
       executeScheduler = MainScheduler; 
      } 

      if (retryScheduler == null) 
      { 
       retryScheduler = MainScheduler; 
      } 

      // so, we need to subscribe to the sequence, if we get an error, then we do that again... 
      return Observable.Create<T>(o => 
      { 
       // whilst we are supposed to be running, we need to execute this 
       var trySubject = new Subject<Exception>(); 

       // record number of times we retry 
       var retryCount = 0; 

       return trySubject. 
        AsObservable(). 
        ObserveOn(retryScheduler). 
        SelectMany(e => Observable.Defer(() => retryObservable(retryCount, e))). // select the retry logic 
        StartWith(true). // prime the pumps to ensure at least one execution 
        TakeWhile(shouldTry => shouldTry). // whilst we should try again 
        ObserveOn(executeScheduler). 
        Select(g => Observable.Defer(source.Materialize)). // get the result of the selector 
        Switch(). // always take the last one 
        Do((v) => 
        { 
         switch (v.Kind) 
         { 
          case NotificationKind.OnNext: 
           o.OnNext(v.Value); 
           break; 

          case NotificationKind.OnError: 
           ++retryCount; 
           trySubject.OnNext(v.Exception); 
           break; 

          case NotificationKind.OnCompleted: 
           trySubject.OnCompleted(); 
           break; 
         } 
        } 
        ).Subscribe(_ => { }, o.OnError, o.OnCompleted); 
      }); 
     } 

ответ

2

Это не ответ на ваш вопрос, а что-то, что может помочь вам: я смотрел на это RetryX на некоторое время, и если вы вырезать все scheduler вещи, которые вы, вероятно, следует, что можно уменьшить это:

public static IObservable<T> RetryX<T>(this IObservable<T> source, Func<int, Exception, IObservable<bool>> retryObservable) 
{ 
    return source.Catch((Exception e) => retryObservable(1, e) 
     .Take(1) 
     .SelectMany(b => b ? source.RetryX((count, ex) => retryObservable(count + 1, ex)) : Observable.Empty<T>())); 
} 

Все вызовы планировщика не являются «лучшей практикой». Причина в том, что большинство операторов Rx не принимают параметры планировщика (Select, Where, Catch и т. Д.). Те, что делают, имеют что-то конкретное, связанное с синхронизацией/планированием: Timer, Delay, Join.

Кто-то, кто заинтересован в указании планировщика на использование без планировщика RetryX, всегда может указать планировщик по переданным параметрам. Обычно вы хотите, чтобы управление потоками находилось в вызывающем потоке верхнего уровня и указывало поток планирование не там, где вы хотите это сделать.

+0

Гораздо лучше, чем я написал! Просто. –

0

Проблема не передавая IScheduler правильно вниз к методу расширения RetryX, а также Observable.Timer.

public static IObservable<T> RetryWithCount<T>(this IObservable<T> source, 
      int retryCount, int delayMillisecondsToRetry, IScheduler executeScheduler = null, 
      IScheduler retryScheduler = null) 
     { 
      if (executeScheduler == null) 
      { 
       executeScheduler = MainScheduler; 
      } 
      var retryAgain = retryCount + 1; 
      return source 
       .RetryX(
        (retry, exception) => 
        { 
         return retry == retryAgain 
          ? Observable.Throw<bool>(exception, executeScheduler) 
          : Observable.Timer(TimeSpan.FromMilliseconds(delayMillisecondsToRetry), executeScheduler) 
           .Select(_ => true); 
        }, 
        retryScheduler, 
        executeScheduler); 
     } 
1

George check out https://github.com/kentcb/Genesis.RetryWithBackoff от Kent для вдохновения.

+0

Да, Кен на самом деле ответил мне, как только увидел мой пост. Я не удалял вопрос для справочных целей :-) –