有一个情况,我有单独的CommandBus和EventBus,我有一个消费者监听命令总线中的命令,在操作处理后,它将相关事件发布到事件总线。我想在EventBus上内置对事务发件箱模式的支持.
这是到一个回购的链接
应用程序的配置如下:
public static void ConfigureServices(HostBuilderContext host, IServiceCollection services)
{
services.Configure<MessageBrokerConfiguration>(host.Configuration.GetSection("MessageBroker"));
var brokerConfiguration = new MessageBrokerConfiguration();
host.Configuration.Bind("MessageBroker", brokerConfiguration);
services.AddHostedService<DatabaseMigratorHostedService>();
services.AddMassTransit<ICommandBus>(mt =>
{
mt.UsingRabbitMq((context, configurator) =>
{
configurator.Host(brokerConfiguration.CommandBus);
configurator.ConfigureEndpoints(context);
});
mt.AddConsumersFromNamespaceContaining<CreateOrderConsumer>();
});
services.AddMassTransit(mt =>
{
mt.AddEntityFrameworkOutbox<OrderContext>(options =>
{
options.QueryDelay = TimeSpan.FromSeconds(1);
options.UsePostgres();
options.UseBusOutbox();
});
mt.UsingRabbitMq((context, configurator) =>
{
configurator.Host(brokerConfiguration.EventBus);
configurator.ConfigureEndpoints(context);
});
});
services.AddRepositories(host.Configuration);
services.AddScoped<IEventEmitter, MasstransitEventEmitter>();
}
,下面是我的消费者,它在一个总线上监听命令,并向另一个总线发布事件:
public sealed class CreateOrderConsumer
: IConsumer<CreateOrder>
{
private readonly IEventEmitter _eventEmitter;
private readonly IUnitOfWork _unitOfWork;
private readonly IRepository<Order> _repository;
public CreateOrderConsumer(
IRepository<Order> repository,
IUnitOfWork unitOfWork,
IEventEmitter eventEmitter)
{
_unitOfWork = Guard.Against.Null(unitOfWork);
_repository = Guard.Against.Null(repository);
_eventEmitter = Guard.Against.Null(eventEmitter);
}
public async Task Consume(ConsumeContext<CreateOrder> context)
{
var order = new Order(context.Message.ProductId, context.Message.Quantity);
await _repository.StoreAsync(order);
await _eventEmitter.Emit(order.DomainEvents);
order.ClearDomainEvents();
await _unitOfWork.CommitAsync();
await context.RespondAsync<CreateOrderResult>(new { OrderId = order.Id });
}
和我的IEventEmitter正在获取IBus:
public sealed class MasstransitEventEmitter : IEventEmitter
{
private readonly IPublishEndpoint _publishEndpoint;
public MasstransitEventEmitter(IBus publishEndpoint)
{
_publishEndpoint = Guard.Against.Null(publishEndpoint);
}
public async Task Emit(IEnumerable<IDomainEvent> domainEvents)
{
try
{
foreach (var domainEvent in domainEvents)
{
await _publishEndpoint.Publish(domainEvent, domainEvent.GetType(), CancellationToken.None);
}
}
catch (Exception)
{
// ignored
}
}
}
下面是用于业务逻辑和事务发件箱配置的DbContext:
public sealed class OrderContext : DbContext, IUnitOfWork
{
public OrderContext(DbContextOptions<OrderContext> options) : base(options)
{
}
internal DbSet<OrderEntity> Orders { get; private set; } = default!;
public async Task CommitAsync(CancellationToken cancellationToken = default)
=> await this.SaveChangesAsync(cancellationToken);
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
modelBuilder.ApplyConfiguration(new OrderEntityConfiguration());
modelBuilder.AddInboxStateEntity();
modelBuilder.AddOutboxMessageEntity();
modelBuilder.AddOutboxStateEntity();
}
}
API层通过IRequestClient<CreateOrder>
向命令总线发送命令并等待得到响应。问题是,当事件总线(不是命令总线)关闭时,事务发件箱不工作,它将继续工作,直到发生超时异常。
[HttpPost]
public async Task<IActionResult> Post(
[FromBody] CreateOrderDto createOrderDto,
[FromServices] IRequestClient<CreateOrder> createOrderRequestClient)
{
var result = await createOrderRequestClient.GetResponse<CreateOrderResult>(
new CreateOrder{ ProductId = createOrderDto.ProductId,Quantity= createOrderDto.Quantity },
timeout: RequestTimeout.After(m:2));
return Ok(result);
}
和API的日志:
info: MassTransit[0]
Bus started: rabbitmq://localhost/
info: Microsoft.Hosting.Lifetime[14]
Now listening on: https://localhost:7129
info: Microsoft.Hosting.Lifetime[14]
Now listening on: http://localhost:5134
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
Content root path: /Users/shahab/dev/talks/Demo.TransactionalOutbox/Demo.TransactionalOutbox.Api
应用层日志(监听命令和发布事件):
[13:33:45 INF] Configured endpoint CancelOrder, Consumer: Demo.TransactionalOutbox.Application.Consumers.CancelOrderConsumer
[13:33:45 INF] Configured endpoint CreateOrder, Consumer: Demo.TransactionalOutbox.Application.Consumers.CreateOrderConsumer
[13:33:45 INF] Configured endpoint GetOrder, Consumer: Demo.TransactionalOutbox.Application.Consumers.GetOrderConsumer
[13:33:49 DBG] Starting bus instances: ICommandBus, IBus
[13:33:49 DBG] Starting bus: rabbitmq://localhost/
[13:33:49 DBG] Starting bus: rabbitmq://localhost:6666/
[13:33:49 DBG] Connect: guest@localhost:5672/
[13:33:49 DBG] Connect: guest@localhost:6666/
[13:33:49 DBG] Connected: guest@localhost:5672/ (address: amqp://localhost:5672, local: 49955)
[13:33:49 DBG] Connected: guest@localhost:6666/ (address: amqp://localhost:6666, local: 49954)
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/McShahab_DemoTransactio_bus_5emoyydyan1f7qhobdppkkw6gp?temporary=true
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost:6666/McShahab_DemoTransactio_bus_5emoyydyan1f7jiabdppkkw9bz?temporary=true
[13:33:50 INF] Bus started: rabbitmq://localhost:6666/
[13:33:50 DBG] Declare queue: name: CancelOrder, durable, consumer-count: 0 message-count: 0
[13:33:50 DBG] Declare queue: name: CreateOrder, durable, consumer-count: 0 message-count: 0
[13:33:50 DBG] Declare queue: name: GetOrder, durable, consumer-count: 0 message-count: 0
[13:33:50 DBG] Declare exchange: name: GetOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: CreateOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: CancelOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.OrderAggregate.Queries:GetOrderStatus, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CancelOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CreateOrder, type: fanout, durable
[13:33:50 DBG] Bind queue: source: GetOrder, destination: GetOrder
[13:33:50 DBG] Bind queue: source: CancelOrder, destination: CancelOrder
[13:33:50 DBG] Bind queue: source: CreateOrder, destination: CreateOrder
[13:33:50 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CreateOrder, destination: CreateOrder
[13:33:50 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.OrderAggregate.Queries:GetOrderStatus, destination: GetOrder
[13:33:50 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CancelOrder, destination: CancelOrder
[13:33:50 DBG] Consumer Ok: rabbitmq://localhost/GetOrder - amq.ctag-jT06Ly0B8--gYF2XxxxyGQ
[13:33:50 DBG] Consumer Ok: rabbitmq://localhost/CreateOrder - amq.ctag-K2-6Gcdxk8z6UPxI0q-xQw
[13:33:50 DBG] Consumer Ok: rabbitmq://localhost/CancelOrder - amq.ctag-YRlkqWCWLKPX1JpCtEThJQ
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/CreateOrder
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/GetOrder
[13:33:50 INF] Bus started: rabbitmq://localhost/
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/CancelOrder
和事件的消费者:
[13:33:47 INF] Configured endpoint OrderCreated, Consumer: Demo.TransactionalOutbox.FancyConsumer.OrderCreatedConsumer
[13:33:48 DBG] Starting bus instances: IBus
[13:33:48 DBG] Starting bus: rabbitmq://localhost:6666/
[13:33:48 DBG] Connect: guest@localhost:6666/
[13:33:48 DBG] Connected: guest@localhost:6666/ (address: amqp://localhost:6666, local: 49947)
[13:33:48 DBG] Endpoint Ready: rabbitmq://localhost:6666/McShahab_DemoTransactio_bus_hrmoyydyan1fh45qbdppkkiyy5?temporary=true
[13:33:48 DBG] Declare queue: name: OrderCreated, durable, consumer-count: 0 message-count: 0
[13:33:48 DBG] Declare exchange: name: OrderCreated, type: fanout, durable
[13:33:48 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.Events:OrderCreated, type: fanout, durable
[13:33:48 DBG] Bind queue: source: OrderCreated, destination: OrderCreated
[13:33:48 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.Events:OrderCreated, destination: OrderCreated
[13:33:48 DBG] Consumer Ok: rabbitmq://localhost:6666/OrderCreated - amq.ctag-53c0dDTumv3l33VqwMiSpA
[13:33:48 DBG] Endpoint Ready: rabbitmq://localhost:6666/OrderCreated
[13:33:48 INF] Bus started: rabbitmq://localhost:6666/
与发件箱模式的示例应用程序相反,我没有看到发件箱的任何日志,无论是在rabbitmq启动和运行时还是在它关闭时。
简短的回答,事务性发件箱只与主(IBus
)总线实例一起工作。使用MultiBus时的任何其他总线实例此时都无法使用事务发件箱。
更新在您的事件发射器中,您不能使用IBus
作为发布端点,因为它没有作用域。但是您也不能只使用IPublishEndpoint
,因为它可能是命令总线上来自消费者的ConsumeContext。获取事务发件箱的底层连接并没有真正设置成从一个总线上的消费者到另一个总线上产生事件的方式。