我有一个MassTransit传奇状态机(源自Automatonymous.MassTransitStateMachine),我正在努力解决一个只有当我将端点配置prefetchCount设置为大于1的值时才会出现的问题。
问题是"StartupCompletedEvent"已发布,然后在传奇状态持久化到数据库之前立即进行处理。
状态机配置如下:
State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);
Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Initially(
When(Requested)
.ThenAsync(async ctx =>
{
Console.WriteLine("Starting up...");
await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
Console.WriteLine("Done starting up...");
}
.TransitionTo(StartingUp)
);
During(StartingUp,
When(StartupCompleted)
.ThenAsync(InitialiseSagaInstanceData)
.TransitionTo(Initialising)
);
// snip...!
当我的传奇收到请求事件时会发生什么:
- Initially块的ThenAsync处理程序被命中。在这一点上,没有任何传奇数据被持久化到回购(正如预期的那样)
- StartupCompletedEvent发布到总线。这里也没有将传奇数据保存到回购中
- Initially声明的ThenAsync块完成。在这之后,传奇数据终于得以保存
- 没有其他事情发生
此时,队列中没有消息,StartupCompletedEvent丢失。但是,数据库中有一个传奇实例。
我已经试过启动,并确定其他线程中的一个(因为我的预取>1)已经接收到该事件,在数据库中没有找到任何correlationId的传奇,并丢弃了该事件。因此,在传奇故事有机会持续下去之前,这一事件正在被公布和处理。
如果我在初始处理程序中添加以下内容:
When(StartupCompleted)
.Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))
然后我执行Console.WriteLine。我对此的理解是,事件已被接收,但已路由到Initially处理程序,因为correlationId不存在传奇。如果我在这一点上设置一个断点,并查看传奇回购,那么还没有传奇。
可能值得一提的还有其他几点:
- 我的传奇回购上下文设置为使用IsolationLevel.Serializable
- 我正在使用EntityFrameworkSagaRepository
- 当预回迁计数设置为1时,一切正常
- 我将Ninject用于DI,并且我的SagaRepository是线程范围的,所以我假设预取计数允许的每个处理程序都有自己的saga存储库副本
- 如果我在一个单独的线程中发布StartupCompletedEvent,在它之前有1000ms的睡眠,那么事情就可以正常工作了。我认为这是因为传奇回购已经完成了对传奇状态的持久化,所以当事件最终被发布并由处理程序获取时,传奇状态会正确地从回购中检索到
如果我遗漏了什么,请告诉我;我试着提供我认为有价值的一切,而不会把这个问题提得太久。。。
我也遇到了这个问题,我想发布Chris的评论作为答案,这样人们就可以找到它。
解决方案是启用"发件箱",以便在传奇故事持久化之前保留消息。
c.ReceiveEndpoint("queue", e =>
{
e.UseInMemoryOutbox();
// other endpoint configuration here
}