版本
- MassTransit 7.0.4
- MassTransit.RabbitMQ 7.0.4
- MassTransit.Extensions.DependencyInjection 7.0.4
我创建了一个Masstransit发布者和消费者。类似于Masstransit视频中的示例。但是,没有呼叫消费者。
这里是.NET主机的初始设置
namespace RedeliveryTest
{
internal class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.AddConsumer<RedeliveryTest.MessageConsumer>(typeof(RedeliveryTest.MessageConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(context);
});
});
services.AddHostedService<Worker>();
});
}
}
工人发布合同
namespace RedeliveryTest
{
public class Worker : BackgroundService
{
readonly IBus _bus;
public Worker(IBus bus, ILogger<Worker> logger)
{
_bus = bus;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await _bus.Publish(new Message { Text = $"The time is {DateTimeOffset.Now}" }, stoppingToken);
await Task.Delay(1000, stoppingToken);
}
}
}
}
消费者的代码
namespace RedeliveryTest
{
public class Message
{
public string Text { get; set; }
}
public class MessageConsumer :
IConsumer<Message>
{
readonly ILogger<MessageConsumer> _logger;
public MessageConsumer()
{
}
public Task Consume(ConsumeContext<Message> context)
{
return Task.CompletedTask;
}
}
public class MessageConsumerDefinition :
ConsumerDefinition<MessageConsumer>
{
public MessageConsumerDefinition()
{
EndpointName = $"test-message-queue";
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<MessageConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000));
endpointConfigurator.UseInMemoryOutbox();
}
}
}
在rabbitmqadmin页面中(http://localhost:15672/)我可以看出,出版商在";交易所;但是队列";测试消息队列";未创建。
当我创建第二个控制台应用程序来只托管消费者时,该消费者确实会收到第一个控制台应用软件发布的消息。这里是第二个消费者的来源。
namespace ConsuleConsumer
{
public class Program
{
public static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<RedeliveryTest.MessageConsumer>(typeof(RedeliveryTest.MessageConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(context);
});
});
var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.GetRequiredService<IBusControl>();
await bus.StartAsync();
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await bus.StopAsync();
}
}
namespace RedeliveryTest
{
public class Message
{
public string Text { get; set; }
}
public class MessageConsumer :
IConsumer<Message>
{
public MessageConsumer() { }
public Task Consume(ConsumeContext<Message> context)
{
return Task.CompletedTask;
}
}
public class MessageConsumerDefinition :
ConsumerDefinition<MessageConsumer>
{
public MessageConsumerDefinition()
{
EndpointName = $"test-message-queue";
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<MessageConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000));
endpointConfigurator.UseInMemoryOutbox();
}
}
}
有了第二个控制台应用程序,我现在可以在rabbitmqadmin页面中看到队列。
问题为什么使用者在.NET通用主机中托管时似乎不起作用?
由于您使用的是旧版本的MassTransit,因此启动总线需要托管服务。看到这个答案,或者总结一下:
之前。。。
services.AddHostedService<Worker>();
包括:
services.AddMassTransitHostedService();
请注意,如果使用MassTransit v8,则不需要执行此操作。