2016-11-25 8 views
1

У меня есть наблюдаемый поток, который рассылает события:Как моделировать «повторяющийся таймаут» в реале?

---*--*--*---*------------------------*---*---***--**----- 

Я хочу обернуть этот поток в другой, который добавит «пожалуйста держать» события каждые N секунд, если не было событие в прошлом N секунды:

Желаемый результат: (иксы вновь введенные события

---*--*--*---*---X---X---X---X---X---X*---*---***--**---X- 

Я пытаюсь моделировать ее в идиоматического реактивным способом, но не нашел оператора, который позволит мне сделать это легко. Тайм-аут - тот, который, кажется, приходит закрывается, за исключением того, что он не повторяется, и он фактически завершает поток.

ответ

0

Это немного «взломанное» решение. Непонятно, что делает код.

public static IObservable<T> RepeatingTimeout<T>(ITestableObservable<T> events, TimeSpan timespan, T timeoutEvent, TestScheduler scheduler) 
{ 
    var groups = events 
     .GroupByUntil(x => 1, g => g, g => g.Throttle(timespan, scheduler)) // group events into observables so that each observable contains sequence of events where each event is sooner than timespan 
     .Select(x => 
      x.Concat(Observable.Interval(timespan, scheduler).Select(_ => timeoutEvent) // concatenate timeout events at intervals after each group of events 
      .StartWith(timeoutEvent)) 
      ) 
     .Switch(); // return events from latest observable 
    return groups; 
} 

И это тест, который показывает, что он работает:

[Test] 
public void RepeatedTimeout() 
{ 
    var scheduler = new TestScheduler(); 

    var events = scheduler.CreateHotObservable(
     OnNext(4, 1), 
     OnNext(7, 2), 
     OnNext(10, 3), 
     OnNext(14, 4), 
     OnNext(30, 5), 
     OnNext(35, 6), 
     OnCompleted<int>(36) 
     ); 

    var timespan = TimeSpan.FromTicks(5); 
    var timeoutEvent = -1; 

    var groups = RepeatingTimeout(events, timespan, timeoutEvent, scheduler); 

    groups.Do(x => Console.WriteLine("G " + scheduler.Clock + " " + x)); 


    var results = scheduler.Start(() => groups, 0, 0, 100); 
} 
+0

благодарит за Помогите. Этот тест ничего не печатает для меня? –

0

Если кто-то бежит в эту проблему позже, это решение, которое я закончил с:

public static class ObservableExtensions 
{ 
    public static IObservable<T> PulseDuringInactivity<T>(this IObservable<T> source, TimeSpan pulseInterval, IScheduler scheduler = null) 
    { 
     return source 
      //for each incoming event, we create a new observable stream, that contains this event being repeated infinitely many times over at our pulse interval 
      //these repeating values will serve as the pulse 
      .Select(sourceValue => RepeatingObservable(sourceValue, pulseInterval, scheduler)) 

      //However obviously that creates way too many events. What we we'll do is throw away all of these repeating observables, as soon as there's a single event coming in on any of our streams. 
      //this is what .Switch() does, and it immediately unsubscribes from the now-irrelevant streams, which can then be gc'd. For more information about the workings of this technique, 
      //check out the tests in ObservableExtensionTests.cs 
      .Switch(); 
    } 

    static IObservable<T> RepeatingObservable<T>(T valueToRepeat, TimeSpan repeatInterval, IScheduler scheduler) 
    { 
     return Observable.Interval(repeatInterval, scheduler ?? Scheduler.Default).Select(_ => valueToRepeat); 
    } 
}