如何使用存储引擎持久化 Saga 实例并避免争用情况



>我尝试使用RedisSagaRepository持久化 Saga 实例;我想在负载平衡设置中运行 Saga,所以我无法使用InMemorySagaRepository. 但是,在我切换后,我注意到 Saga 没有处理消费者发布的某些事件。我检查了队列,没有看到任何消息。

我注意到的是,当使用者花费很少或没有时间来处理命令和发布事件时,可能会发生这种情况。 如果我使用InMemorySagaRepository或在Consumer.Consume()中添加Task.Delay(),则不会出现此问题

我用错了吗?

另外,如果我想在负载平衡设置中运行 Saga,并且 Saga 需要使用字典发送多个相同类型的命令来跟踪完整性(类似于处理多个事件的状态转换中的逻辑(。当多个使用者同时发布事件时,如果两个 Sagas 同时处理两个不同的事件,我是否会遇到争用条件?在这种情况下,状态中的字典对象是否会正确设置?

代码可在此处获得

SagaService.ConfigureSagaEndPoint()是我在InMemorySagaRepositoryRedisSagaRepository之间切换的地方

private void ConfigureSagaEndPoint(IRabbitMqReceiveEndpointConfigurator endpointConfigurator)
{
var stateMachine = new MySagaStateMachine();
try
{
var redisConnectionString = "192.168.99.100:6379";
var redis = ConnectionMultiplexer.Connect(redisConnectionString);
///If we switch to RedisSagaRepository and Consumer publish its response too quick,
///It seems like the consumer published event reached Saga instance before the state is updated
///When it happened, Saga will not process the response event because it is not in the "Processing" state
//var repository = new RedisSagaRepository<SagaState>(() => redis.GetDatabase());
var repository = new InMemorySagaRepository<SagaState>();
endpointConfigurator.StateMachineSaga(stateMachine, repository);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}

LeafConsumer.Consumption是我们添加Task.Delay((的地方。

public class LeafConsumer : IConsumer<IConsumerRequest>
{
public async Task Consume(ConsumeContext<IConsumerRequest> context)
{
///If MySaga project is using RedisSagaRepository, uncomment await Task.Delay() below
///Otherwise, it seems that the Publish message from Consumer will not be processed
///If using InMemorySagaRepository, code will work without needing Task.Delay
///Maybe I am doing something wrong here with these projects
///Or in real life, we probably have code in Consumer that will take a few milliseconds to complete
///However, we cannot predict latency between Saga and Redis
//await Task.Delay(1000);
Console.WriteLine($"Consuming CorrelationId = {context.Message.CorrelationId}");
await context.Publish<IConsumerProcessed>(new
{
context.Message.CorrelationId,
});
}
}

当您以这种方式发布事件,并且将多个服务实例与非事务性 saga 存储库(例如 Redis(一起使用时,您需要设计您的 saga,以便 Redis 使用和强制执行唯一标识符。这可以防止创建同一传奇的多个实例。

您还需要接受处于超过"预期"状态的事件。例如,期望接收 Start(这会将 saga 置于处理状态(之前,仅在处理中接收另一个事件可能会失败。建议允许 saga 由任何事件序列启动(最初,在 Automatonymous 中(,以避免无序消息传递问题。只要所有事件都使表盘从左向右移动,就会达到最终状态。如果在较晚的事件之后收到较早的事件,则不应向后移动状态(在本例中为向左移动(,而应仅向 saga 实例添加信息并将其保留为较晚的状态。

如果在单独的服务实例上处理两个事件,它们都将尝试将 saga 实例插入 Redis,这将作为重复项失败。然后,消息应重试(将 UseMessageRetry(( 添加到接收终结点(,然后选取现在存在的 saga 实例并应用事件。

最新更新