Я начал изучать использование реактивных расширений с EventStore. В качестве доказательства концепции я хотел бы узнать, могу ли я заставить Rx потреблять поток событий и выводить количество событий, сгруппированных по типу, для окна в одну секунду.Rx: количество сгруппированных событий в переходящем окне
Так, говорят, что я потребляя поток с именем «заказы», я хотел бы увидеть что-то вроде следующего появиться в консоли:
OrderCreated 201
OrderUpdated 111
(а второй проходы ..)
OrderCreated 123
OrderUpdated 132
И так далее.
До сих пор я мог получить вывод из всех событий в секунду. Но, похоже, они не могут группировать их по типу события.
кода я использую основан на gist Джеймсе Нуджент:
internal class EventStoreRxSubscription
{
public Subject<ResolvedEvent> ResolvedEvents { get; }
public Subject<SubscriptionDropReason> DroppedReasons { get; }
public EventStoreSubscription Subscription { get; }
public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> resolvedEvent, Subject<SubscriptionDropReason> droppedReasons)
{
Subscription = subscription;
ResolvedEvents = resolvedEvent;
DroppedReasons = droppedReasons;
}
}
static class EventStoreConnectionExtensions
{
public static Task<EventStoreRxSubscription> SubscribeTo(this IEventStoreConnection connection, string streamName, bool resolveLinkTos)
{
return Task<EventStoreRxSubscription>.Factory.StartNew(() => {
var resolvedEvents = new Subject<ResolvedEvent>();
var droppedReasons = new Subject<SubscriptionDropReason>();
var subscriptionTask = connection.SubscribeToStreamAsync(streamName, resolveLinkTos,
(subscription, @event) => resolvedEvents.OnNext(@event),
(subscription, dropReason, arg3) => droppedReasons.OnNext(dropReason));
subscriptionTask.Wait();
return new EventStoreRxSubscription(subscriptionTask.Result, resolvedEvents, droppedReasons);
});
}
}
class Program
{
static void Main(string[] args)
{
var connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113));
connection.ConnectAsync();
var subscriptionTask = connection.SubscribeTo("orders", true);
subscriptionTask.Wait();
var events = subscriptionTask.Result.ResolvedEvents;
var query = events.Timestamp()
.Buffer(TimeSpan.FromSeconds(1))
.Select(e => e.Count);
query.Subscribe(Console.WriteLine);
Console.ReadLine();
}
}
сделал мой обновленный ответ вам помочь с этим? – Oliver
Да. Я просто получил шанс попробовать, и это выглядит хорошо. Большое спасибо! –