使用JOliver EventStore 3.0,开始使用简单的示例。
我有一个使用NServiceBus的简单发布/子CQRS实现。客户端在总线上发送命令,域服务器接收并处理命令,并将事件存储到事件存储,然后由事件存储的调度器在总线上发布。读取模型服务器随后订阅这些事件以更新读取模型。没有什么花哨的,基本上是照本宣科的。
它是有效的,但只是在简单的测试中,当事件存储到EventStore时,我在域服务器上(间歇性)收到了很多并发异常。它正确地重试,但有时会达到5次重试的限制,命令最终会出现在错误队列中。
我可以从哪里开始调查并发异常的原因?我删除了调度器,只专注于存储事件,它也有同样的问题。
我使用RavenDB来持久化我的EventStore。我没有做任何花哨的事情,只是这个:
using (var stream = eventStore.OpenStream(entityId, 0, int.MaxValue))
{
stream.Add(new EventMessage { Body = myEvent });
stream.CommitChanges(Guid.NewGuid());
}
异常的堆栈跟踪如下所示:
2012-03-17 18:34:01166〔工人.14〕警告NServiceBus.Unicast.UnicastBus[(null)]<(null)>-EmployeeCommandHandler处理消息失败。EventStore.ConcurrentException:类型为的异常引发了"EventStore.CurrentException"。在中的EventStore.OptisticPipelineHook.PreCommit(提交尝试)c: \Code\public\EventStore\src\proj\EventStore.Core\OptimisticPipelineHook.cs:line55在中的EventStore.OptisticEventStore.Commit(提交尝试)c: \Code\public\EventStore\src\proj\EventStore.Core\OptimisticEventStore.cs:lineEventStore.OptisticEventStream.PersistChanges(GuidcommitId)c: \Code\public\EventStore\src\proj\EventStore.Core\OptimisticEventStream.cs:line168在EventStore.OptisticEventStream.CommitChanges(GuidcommitId)c: \Code\public\EventStore\src\proj\EventStore.Core\OptimisticEventStream.cs:line149,位于CQRSTest3.Domain.Extensions.StoreEvent(IStoreEvents中的eventStore、Guid entityId、Object evt)C: \dev\test\CRSTest3\CRSTest3.Domain\Extensions.cs:line 13 atCQRSTest3.Domain.ComandHandlers.EmployeCommandHandler.Handle(ChangeEmployeeSalary消息)C: \dev\test\CRSTest3\CRSTest3.Domain\ComandHandlers\AmployeeCommandHandler.cs:line 55
我想明白了。尽管如此,还是得翻遍源代码才能找到它。我希望有更好的记录!这是我的新事件商店连线:
EventStore = Wireup.Init()
.UsingRavenPersistence("RavenDB")
.ConsistentQueries()
.InitializeStorageEngine()
.Build();
我不得不添加.ConsistentQueries(),以便raven持久性提供程序在内部对eventstore向raven进行的查询使用WaitForNonStaleResults。
基本上,当我添加一个新事件,然后在raven赶上索引之前尝试添加另一个事件时,流修订不是最新的。第二个事件将取代第一个事件。