使用预取运行的公共交通传奇 > 1



我有一个MassTransit传奇状态机(源自Automatonymous.MassTransitStateMachine),我正在努力解决一个只有当我将端点配置prefetchCount设置为大于1的值时才会出现的问题。

问题是"StartupCompletedEvent"已发布,然后在传奇状态持久化到数据库之前立即进行处理。

状态机配置如下:

State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);
Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));

Initially(
    When(Requested)
        .ThenAsync(async ctx => 
        {
          Console.WriteLine("Starting up...");
          await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
          Console.WriteLine("Done starting up...");
        }
        .TransitionTo(StartingUp)
);

During(StartingUp,
    When(StartupCompleted)
        .ThenAsync(InitialiseSagaInstanceData)
        .TransitionTo(Initialising)
);
// snip...!

当我的传奇收到请求事件时会发生什么:

  1. Initially块的ThenAsync处理程序被命中。在这一点上,没有任何传奇数据被持久化到回购(正如预期的那样)
  2. StartupCompletedEvent发布到总线。这里也没有将传奇数据保存到回购中
  3. Initially声明的ThenAsync块完成。在这之后,传奇数据终于得以保存
  4. 没有其他事情发生

此时,队列中没有消息,StartupCompletedEvent丢失。但是,数据库中有一个传奇实例。

我已经试过启动,并确定其他线程中的一个(因为我的预取>1)已经接收到该事件,在数据库中没有找到任何correlationId的传奇,并丢弃了该事件。因此,在传奇故事有机会持续下去之前,这一事件正在被公布和处理。

如果我在初始处理程序中添加以下内容:

When(StartupCompleted)
    .Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))

然后我执行Console.WriteLine。我对此的理解是,事件已被接收,但已路由到Initially处理程序,因为correlationId不存在传奇。如果我在这一点上设置一个断点,并查看传奇回购,那么还没有传奇。

可能值得一提的还有其他几点:

  1. 我的传奇回购上下文设置为使用IsolationLevel.Serializable
  2. 我正在使用EntityFrameworkSagaRepository
  3. 当预回迁计数设置为1时,一切正常
  4. 我将Ninject用于DI,并且我的SagaRepository是线程范围的,所以我假设预取计数允许的每个处理程序都有自己的saga存储库副本
  5. 如果我在一个单独的线程中发布StartupCompletedEvent,在它之前有1000ms的睡眠,那么事情就可以正常工作了。我认为这是因为传奇回购已经完成了对传奇状态的持久化,所以当事件最终被发布并由处理程序获取时,传奇状态会正确地从回购中检索到

如果我遗漏了什么,请告诉我;我试着提供我认为有价值的一切,而不会把这个问题提得太久。。。

我也遇到了这个问题,我想发布Chris的评论作为答案,这样人们就可以找到它。

解决方案是启用"发件箱",以便在传奇故事持久化之前保留消息。

c.ReceiveEndpoint("queue", e =>
{
    e.UseInMemoryOutbox();
    // other endpoint configuration here
}

最新更新