2016-02-29 1 views
3

Я начал изучать использование реактивных расширений с EventStore. В качестве доказательства концепции я хотел бы узнать, могу ли я заставить Rx потреблять поток событий и выводить количество событий, сгруппированных по типу, для окна в одну секунду.Rx: количество сгруппированных событий в переходящем окне

Так, говорят, что я потребляя поток с именем «заказы», ​​я хотел бы увидеть что-то вроде следующего появиться в консоли:

OrderCreated 201 
OrderUpdated 111 

(а второй проходы ..)

OrderCreated 123 
OrderUpdated 132 

И так далее.

До сих пор я мог получить вывод из всех событий в секунду. Но, похоже, они не могут группировать их по типу события.

кода я использую основан на gist Джеймсе Нуджент:

internal class EventStoreRxSubscription 
{ 
    public Subject<ResolvedEvent> ResolvedEvents { get; } 

    public Subject<SubscriptionDropReason> DroppedReasons { get; } 
    public EventStoreSubscription Subscription { get; } 

    public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> resolvedEvent, Subject<SubscriptionDropReason> droppedReasons) 
    { 
     Subscription = subscription; 
     ResolvedEvents = resolvedEvent; 
     DroppedReasons = droppedReasons; 
    } 
} 

static class EventStoreConnectionExtensions 
{ 
    public static Task<EventStoreRxSubscription> SubscribeTo(this IEventStoreConnection connection, string streamName, bool resolveLinkTos) 
    { 
     return Task<EventStoreRxSubscription>.Factory.StartNew(() => { 

      var resolvedEvents = new Subject<ResolvedEvent>(); 
      var droppedReasons = new Subject<SubscriptionDropReason>(); 

      var subscriptionTask = connection.SubscribeToStreamAsync(streamName, resolveLinkTos, 
                    (subscription, @event) => resolvedEvents.OnNext(@event), 
                    (subscription, dropReason, arg3) => droppedReasons.OnNext(dropReason)); 
      subscriptionTask.Wait(); 

      return new EventStoreRxSubscription(subscriptionTask.Result, resolvedEvents, droppedReasons); 
     }); 
    } 
} 

class Program 
{ 
    static void Main(string[] args) 
    { 
     var connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113)); 
     connection.ConnectAsync(); 

     var subscriptionTask = connection.SubscribeTo("orders", true); 
     subscriptionTask.Wait(); 

     var events = subscriptionTask.Result.ResolvedEvents; 

     var query = events.Timestamp() 
       .Buffer(TimeSpan.FromSeconds(1)) 
       .Select(e => e.Count); 

     query.Subscribe(Console.WriteLine); 

     Console.ReadLine(); 
    } 
} 
+0

сделал мой обновленный ответ вам помочь с этим? – Oliver

+1

Да. Я просто получил шанс попробовать, и это выглядит хорошо. Большое спасибо! –

ответ

2

я сделал что-то похожее на это раньше, я использовал Throttle сгруппировать все события в пределах заданной частоты, однако вы можете использовать Buffer, чтобы получить счет/коллекцию за каждый период.

В приведенном ниже примере представлен абстрактный пример того, как я достиг этого, где AggregateType и AggregateFunction будут заменены вашим собственным типом и агрегацией.

GroupByUntil позволяет группировать по типу в течение установленного периода.

subscription = observable 
    .GroupByUntil(e => e.Key, e => e.Buffer(TimeSpan.FromSeconds(1))) 
    .SelectMany(e => e.Aggregate(new AggregateType(), (a, e) => AggregateFunction(a, e)) 
    .Subscribe(onNext, onError, onCompleted); 

EDIT

Ниже приведен краткий пример, я постучал, чтобы показать, как это может быть сделано

public class EventType 
{ 
    public string Type { get; set; } 
} 

public class AggregatedType 
{ 
    public string EventType { get; set; } 
    public int Count { get; set; } 
} 

class Program 
{ 
    public delegate void ExampleEventHandler(EventType e); 

    public static event ExampleEventHandler Event; 

    static void Main(string[] args) 
    { 
     var subscription = Observable.FromEvent<ExampleEventHandler, EventType>(e => Event += e, e => Event -= e) 
      .GroupByUntil(e => e.Type, e => e.Buffer(TimeSpan.FromSeconds(1))) 
      .SelectMany(e => e 
       .Select(ev => new AggregatedType { EventType = ev.Type }) 
       .Aggregate(new AggregatedType(), (a, ev) => new AggregatedType { EventType = ev.EventType, Count = a.Count + 1 })) 
      .Subscribe(OnAggregaredEvent, OnException, OnCompleted); 

     Event(new EventType { Type = "A" }); 
     Event(new EventType { Type = "A" }); 
     Event(new EventType { Type = "B" }); 
     Event(new EventType { Type = "B" }); 

     SpinWait.SpinUntil(()=> false, TimeSpan.FromSeconds(2)); 

     Event(new EventType { Type = "A" }); 
     Event(new EventType { Type = "A" }); 
     Event(new EventType { Type = "B" }); 
     Event(new EventType { Type = "B" }); 

     Console.ReadLine(); 
    } 

    static void OnAggregaredEvent(AggregatedType aggregated) 
    { 
     Console.WriteLine("Type: {0}, Count: {1}", aggregated.EventType, aggregated.Count); 
    } 

    static void OnException(Exception ex) 
    { 
    } 

    static void OnCompleted() 
    { 
    } 
} 
+0

Спасибо. Я попытался превратить ваш абстрактный пример в код, но я действительно стараюсь получить что-то работоспособное. Кажется, что AggregateFunction принимает каждого члена последовательности, тогда как я хотел бы получить количество всех членов для каждого типа события. –

+0

@DavidBrower см. Мое редактирование для рабочего примера – Oliver