2017-02-08 15 views
0

У меня есть код, который медленно транслируется из наблюдаемого. Я хотел бы имитировать живой поток из сохранившихся объектов и их отметку времени через другую наблюдаемую.Как построить один запланированный Наблюдаемый из массива с отметками времени

Я смог что-то сделать, используя Observable.Return.Delay на массиве, а затем слияние с одним наблюдаемым. Я чувствую, что это неправильный подход, и, возможно, для каждого элемента нужен поток, поэтому синхронизация может завершиться неудачно, когда пул потоков будет заполнен.

var data = Enumerable.Range(1, 10); 

var observable = data 
    .Select((x, idx) => Observable.Return(x).Delay(DateTimeOffset.Now.AddSeconds(idx*3))) 
    .Merge(); 

Каков наилучший способ запланировать статические данные?

+0

Это для тестирования? – Shlomo

+0

Это для воспроизведения производственной среды в UAT. – KrisG

+0

Возможно, вы захотите использовать что-то вроде '.Concat' вместо' Merge' else с очень большими наборами для 'data', вы можете просто затопить Планировщик. Это будет работать, только если у вас есть относительные отметки времени. Однако вы, например, просто используете сгенерированный 'idx', который, как я полагаю, не то, что вы на самом деле делаете (вместо этого читайте из журнала или потока событий) –

ответ

0

Много различных ответов на этот, и я не совсем уверен, что является лучшим решением.

Я нашел пару довольно чистых перспективных решений ...

public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver) 
{ 
    return items.Select(x => Observable.Return(x).DelaySubscription(delayResolver(x))).Concat(); 
} 

public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver) 
{ 
    return items.Select(x => Observable.Empty<TSource>().Delay(delayResolver(x)).Concat(Observable.Return(x))).Concat(); 
} 

Вероятно, лучшее решение, которое чувствует, как он использует служебный метод, предназначенный для этого, и поэтому менее вероятно страдать от побочных эффектов, является метод Generate, который Daniel C. Weber referred to. Хотя я не совсем понял это, пока не нашел более подробный ответ this.

public static IObservable<TSource> IntervalSeries<TSource>(this IEnumerable<TSource> items, Func<TSource, TimeSpan> delayResolver) 
{ 
    return Observable.Generate(
     items.GetEnumerator(), 
     x => x.MoveNext(), 
     x => x, 
     x => x.Current, 
     x => delayResolver(x.Current)); 
} 
0

Я думаю, что самый простой способ будет использовать метод Observable.Interval. Если вы используете это для тестирования и нуждаетесь в повторном использовании, вы можете создать метод обработки задержки результатов.

static IObservable<TSource> DelayObservable<TSource>(IEnumerable<TSource> source, TimeSpan delay) 
{ 
    var observableSource = source.ToObservable(); 

    // Use start with to fire one off right away 
    var interval = Observable.Interval(delay).StartWith(0); 

    // Zip so when the enumerable is done interval is auto unsubscribed 
    return interval.Zip(observableSource, (d, S) => S); 
} 

Тогда можно назвать как

var observable = DelayObservable(Enumerable.Range(1, 10), TimeSpan.FromSeconds(3)) 
+0

У меня есть другое время, связанное с каждым элементом в массиве, и вам потребуется конкретная задержка для элемента в массиве. Извините, мой примерный код не прояснил ситуацию. Мне нравится идея застегнуть две наблюдаемые. – KrisG

+0

FYI 'var interval = Observable.Interval (delay) .StartWith (0);' может быть записано как 'var interval = Observable.Timer (TimeSpan.Zero, delay);' –

+0

@KrisG Я запустил ваш код, и он просто помещает 3 секунды между ними. Вы ищете 3 секунды, затем 6 секунд, а затем 9 секундных задержек между каждым выходом? – CharlesNRice

0

Там очень удобный класс как часть пакета Rx вы можете использовать: Timestamped. Во что бы то ни стало, вам не обязательно. Вот пример:

var timestampedObjects = Enumerable.Range(0, 5) 
    .Select(i => Timestamped.Create(i, DateTimeOffset.Now + TimeSpan.FromSeconds(i*i))) 
    .ToList(); 

var observable = timestampedObjects.ToObservable() 
    .SelectMany(t => Observable.Timer(t.Timestamp).Take(1).Select(_ => t.Value)); 

observable.Subscribe(i => Console.WriteLine($"{DateTimeOffset.Now}: {i}")); 

Это почти идентичен к вашему решению, кроме заменял из Delay для Timer. Delay предназначен для сохранения временных разрывов между элементами для полного наблюдаемого, но смены времени всего. Timer предназначен для абсолютного планирования одного элемента.

Select, за которым следует Merge, эквивалентен SelectMany.

Ваше решение должно работать, и я не буду беспокоиться о проблемах с потоками, Rx достаточно эффективен при планировании/объединении ресурсов потоков.

Другой альтернативой является пакет Microsoft.Reactive.Testing, который позволяет планировать вещи в виртуальном времени, часах или даже годах друг от друга, но запускать их мгновенно. Это здорово для модульного тестирования, несколько громоздкое для чего-либо еще.

1

Есть перегрузка Generate, которая принимает точные значения DateTimeOffset, когда испускать элемент. Take a look.

Подпись выглядит сложной, но довольно простой. Вы должны убедиться, что используемые вами временные метки находятся в порядке возрастания, так как функция timestamp только оценивается, когда был отправлен предыдущий элемент.

Откуда вы получаете временные метки? Вы упомянули о другом наблюдаемом. Если бы вы могли улучшить свой пример, я мог бы попытаться создать какой-то рабочий код.

0

Вы можете использовать такой оператор Play и подавать его своими данными - возможно, с перестановкой времени (в будущее). Я решил использовать Notification<T>, поэтому он также будет поддерживать тестирование для завершения и ошибки.

public static IObservable<T> Play<T>(this IEnumerable<Timestamped<Notification<T>>> source, IScheduler scheduler = null) 
{ 
    if (scheduler == null) scheduler = Scheduler.Default; 

    return Observable.Create<T>(observer => new CompositeDisposable(source 
     .Where(tn => tn.Timestamp > scheduler.Now) 
     .Select(tn => scheduler.Schedule(tn.Timestamp,() => tn.Value.Accept(observer))))); 
} 

Тест:

var events = Enumerable.Range(0, 10) 
    .Select(i => Timestamped.Create(i, DateTimeOffset.Now.AddSeconds(3 * i))) 
    .ToList(); 
var sch = new TestScheduler(); 
var testee = events 
    .Select(e => Timestamped.Create(Notification.CreateOnNext(e.Value), e.Timestamp)) 
    .Play(sch); 
var result = new List<Timestamped<int>>(); 
using (testee.Timestamp(sch).Subscribe(i => result.Add(i))) 
    sch.Start(); 
Assert.True(result.SequenceEqual(events));