为什么补偿方法在消费者在 MassTransit RouterSlip 中抛出异常时不调用



我在传奇状态机中构建了一个路由器支路:

var builder = new RoutingSlipBuilder(NewId.NextGuid());
var submitOrderUrl = QueueNames.GetActivityUri(nameof(SubmitOrderActivity));
builder.AddActivity("SubmitOrder", submitOrderUrl, new
{
context.Message.OrderId
});;
builder.AddActivity("Payment", QueueNames.GetActivityUri(nameof(PaymentActivity)), new {
context.Message.OrderId,
context.Message.CustomerId,
context.Message.Credit
});

builder.AddActivity("TakeProduct", QueueNames.GetActivityUri(nameof(TakeProductActivity)), new
{
context.Message.OrderId,
Baskets
});
builder.AddVariable("OrderId", context.Message.OrderId);
var routingSlip = builder.Build();
await context.Execute(routingSlip);

我有TakeProductActivity活动:公共类TakeProductActivity:IActivity<TakeProductArgument,TakeProductLog>:…

public async Task<ExecutionResult> Execute(ExecuteContext<TakeProductArgument> context)
{
logger.LogInformation($"Take Product Courier called for order {context.Arguments.OrderId}");            
var uri = QueueNames.GetMessageUri(nameof(TakeProductTransactionMessage));
var sendEndpoint = await context.GetSendEndpoint(uri);
await sendEndpoint.Send<TakeProductTransactionMessage>(new
{
ProductBaskets = context.Arguments.Baskets                
});

return context.Completed(new { Baskets = context.Arguments.Baskets, OrderId=context.Arguments.OrderId });
}

当我使用sendEndpoint.Send((方法(fire&forget(时,当服务中发生异常时,补偿方法不会自动激活,但当我使用requestClient.GetResponse(request/respoy(方法调用服务时,当异常发生时,会自动调用Compensate方法。在PaymentConsumer中,当抛出异常时,必须调用补偿支付方法,但没有!

///this class has implemented in another micro-service hosted separate process:
public class TakeProductTransactionConsumer : IConsumer<TakeProductTransactionMessage>
....
public async Task Consume(ConsumeContext<TakeProductTransactionMessage> context)
{
if(context.Message.ProductBaskets.Count>0)
{ 
throw new Exception("Process Failed!");
}
logger.LogInformation($"Take product called ");

Dictionary<int, int> productCounts = new Dictionary<int, int>();
foreach (var item in context.Message.ProductBaskets)
{
productCounts.Add(item.ProductId, item.Count);
}
var products = await productService.TakeProducts(productCounts);
await publishEndpoint.Publish<ProductsUpdatedEvent>(new
{
ProductUpdatedEvents = products.Select(p =>new { ProductId = p.Id,p.Price,p.Count}).ToList()
});


}

问题是MassTransit无法从rabbitMQ中获取Exception并自动调用补偿方法。当路由器打滑活动中抛出异常时,我应该如何对MassTransit说呼叫补偿

如果您的Take Product活动使用Send to即发即弃到Take Product服务,而该服务抛出异常,则该活动将永远不会知道它,因为它已经完成。即发即弃只是,目的地服务中没有观察到任何例外。

如果您希望在take-product服务抛出异常时,take-product活动失败,则需要使用请求/响应来观察服务中的异常。

相关内容

  • 没有找到相关文章

最新更新