我正试图弄清楚为什么"发送";消息不调用状态机;发布"一条消息,它起作用了,我可以看到状态在变化。
以下是我的代码,它与文档类似,只是我试图";发送";一条消息。
组件
状态机:
public class OrderState: SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public int CurrentState { get; set; }
public DateTime? OrderDate { get; set; }
}
public class OrderStateMachine: MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public State Accepted { get; private set; }
public State Completed { get; private set; }
public Event<SubmitOrder> SubmitOrder { get; private set; }
public Event<OrderAccepted> OrderAccepted { get; private set; }
public Event<OrderCompleted> OrderCompleted { get; private set; }
public OrderStateMachine()
{
InstanceState(x => x.CurrentState, Submitted, Accepted, Completed);
Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
Initially(
When(SubmitOrder)
.Then(context => context.Instance.OrderDate = context.Data.OrderDate)
.TransitionTo(Submitted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
Ignore(SubmitOrder));
DuringAny(
When(OrderCompleted)
.TransitionTo(Completed));
SetCompleted(async instance =>
{
var currentState = await this.GetState(instance);
return Completed.Equals(currentState);
});
}
}
合同:
public record SubmitOrder(Guid OrderId, DateTime? OrderDate);
public record OrderAccepted(Guid OrderId);
public record OrderCompleted(Guid OrderId);
消费者:
public class SubmitOrderConsumer: IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
await Task.Delay(2000);
}
}
public class SubmitOrderConsumerDefinition : ConsumerDefinition<SubmitOrderConsumer>
{
public SubmitOrderConsumerDefinition()
{
EndpointName = "submit-order";
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
{
endpointConfigurator.ConfigureConsumeTopology = false;
}
}
Web API
程序.cs(代码段(
// Add services to the container.
builder.Services.AddMassTransit(cfg =>
{
cfg.SetKebabCaseEndpointNameFormatter();
cfg.UsingRabbitMq((context, configurator) =>
{
configurator.Host("localhost", "/", hostConfigurator =>
{
hostConfigurator.Username("guest");
hostConfigurator.Password("guest");
});
});
});
builder.Services.AddMassTransitHostedService();
builder.Services.AddControllers();
订单控制器
[Route("order")]
public class OrderController : ControllerBase
{
private readonly ISendEndpointProvider _sendEndpointProvider;
public OrderController(ISendEndpointProvider sendEndpointProvider)
{
_sendEndpointProvider = sendEndpointProvider;
}
[HttpPost]
public async Task<IActionResult> SendOrder()
{
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("exchange:submit-order"));
await endpoint.Send(new SubmitOrder(Guid.NewGuid(), DateTime.Now));
return Ok();
}
}
工人服务
程序.cs
using IHost = Microsoft.Extensions.Hosting.IHost;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddMassTransit(cfg =>
{
cfg.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
cfg.AddSagaStateMachine<OrderStateMachine, OrderState>().InMemoryRepository();
cfg.UsingRabbitMq((context, rabbitMqConfigurator) =>
{
rabbitMqConfigurator.Host("localhost", "/", hostConfigurator =>
{
hostConfigurator.Username("guest");
hostConfigurator.Password("guest");
});
rabbitMqConfigurator.ReceiveEndpoint("saga-order", endpointConfigurator =>
{
endpointConfigurator.ConfigureSaga<OrderState>(context);
});
rabbitMqConfigurator.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddHostedService<Worker>();
})
.Build();
await host.RunAsync();
然后我通过邮差发一个帖子:http://localhost:5000/order
它确实调用了SubmitOrderConsumer,但由于某种原因,状态机没有被调用(它不会在Then处理程序中遇到断点,该处理程序将订单日期设置在Initially状态中。(。我想我缺少了将两者连接在一起的东西。非常感谢您的反馈。非常感谢。
在您的示例中,您希望使用Publish
,尤其是在这个场景中,您有两个消费者(消费者和状态机(在不同的端点(队列(上,它们将使用消息。直接发送到交换机只会将消息发送到其中一个端点。