我已经开始考虑将反应式扩展与EventStore一起使用。作为概念证明,我想看看我是否可以让 Rx 使用事件流并输出按类型分组的事件计数,为期一秒钟。
因此,假设我正在使用名为"orders"的流,我希望在控制台中看到类似以下内容的内容:
OrderCreated 201
OrderUpdated 111
(一秒钟过去了。
OrderCreated 123
OrderUpdated 132
等等。
到目前为止,我已经能够获得每秒所有事件计数的输出。但似乎无法按事件类型对它们进行分组。
我使用的代码基于James Nugent的要点:
internal class EventStoreRxSubscription
{
public Subject<ResolvedEvent> ResolvedEvents { get; }
public Subject<SubscriptionDropReason> DroppedReasons { get; }
public EventStoreSubscription Subscription { get; }
public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> resolvedEvent, Subject<SubscriptionDropReason> droppedReasons)
{
Subscription = subscription;
ResolvedEvents = resolvedEvent;
DroppedReasons = droppedReasons;
}
}
static class EventStoreConnectionExtensions
{
public static Task<EventStoreRxSubscription> SubscribeTo(this IEventStoreConnection connection, string streamName, bool resolveLinkTos)
{
return Task<EventStoreRxSubscription>.Factory.StartNew(() => {
var resolvedEvents = new Subject<ResolvedEvent>();
var droppedReasons = new Subject<SubscriptionDropReason>();
var subscriptionTask = connection.SubscribeToStreamAsync(streamName, resolveLinkTos,
(subscription, @event) => resolvedEvents.OnNext(@event),
(subscription, dropReason, arg3) => droppedReasons.OnNext(dropReason));
subscriptionTask.Wait();
return new EventStoreRxSubscription(subscriptionTask.Result, resolvedEvents, droppedReasons);
});
}
}
class Program
{
static void Main(string[] args)
{
var connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113));
connection.ConnectAsync();
var subscriptionTask = connection.SubscribeTo("orders", true);
subscriptionTask.Wait();
var events = subscriptionTask.Result.ResolvedEvents;
var query = events.Timestamp()
.Buffer(TimeSpan.FromSeconds(1))
.Select(e => e.Count);
query.Subscribe(Console.WriteLine);
Console.ReadLine();
}
}
我以前做过类似的事情,我用Throttle
在设定的频率内对所有事件进行分组,但是您可以使用Buffer
来获取每个周期的计数/集合。
下面的示例提供了一个抽象示例,说明我是如何实现这一点的,其中AggregateType
和AggregateFunction
将被您自己的类型和聚合所取代。
GroupByUntil
允许您在设定的时间段内按类型分组。
subscription = observable
.GroupByUntil(e => e.Key, e => e.Buffer(TimeSpan.FromSeconds(1)))
.SelectMany(e => e.Aggregate(new AggregateType(), (a, e) => AggregateFunction(a, e))
.Subscribe(onNext, onError, onCompleted);
编辑
下面是一个快速示例,我敲出了一个示例,以展示如何做到这一点
public class EventType
{
public string Type { get; set; }
}
public class AggregatedType
{
public string EventType { get; set; }
public int Count { get; set; }
}
class Program
{
public delegate void ExampleEventHandler(EventType e);
public static event ExampleEventHandler Event;
static void Main(string[] args)
{
var subscription = Observable.FromEvent<ExampleEventHandler, EventType>(e => Event += e, e => Event -= e)
.GroupByUntil(e => e.Type, e => e.Buffer(TimeSpan.FromSeconds(1)))
.SelectMany(e => e
.Select(ev => new AggregatedType { EventType = ev.Type })
.Aggregate(new AggregatedType(), (a, ev) => new AggregatedType { EventType = ev.EventType, Count = a.Count + 1 }))
.Subscribe(OnAggregaredEvent, OnException, OnCompleted);
Event(new EventType { Type = "A" });
Event(new EventType { Type = "A" });
Event(new EventType { Type = "B" });
Event(new EventType { Type = "B" });
SpinWait.SpinUntil(()=> false, TimeSpan.FromSeconds(2));
Event(new EventType { Type = "A" });
Event(new EventType { Type = "A" });
Event(new EventType { Type = "B" });
Event(new EventType { Type = "B" });
Console.ReadLine();
}
static void OnAggregaredEvent(AggregatedType aggregated)
{
Console.WriteLine("Type: {0}, Count: {1}", aggregated.EventType, aggregated.Count);
}
static void OnException(Exception ex)
{
}
static void OnCompleted()
{
}
}