在windows服务中如何正确处理RabbitMQ事件?



我以前从未创建过Windows服务,现在我被要求这样做。我需要创建一个服务来处理RabbitMQ消费者引发的事件。

在我看来,一旦你连接了事件处理程序,你就需要一些东西来"保持服务的活动"。否则,服务将结束,处理程序将消失。我看到的代码样本似乎表明你把应用程序放入了一个while(true)循环——这感觉不对。

这是我到目前为止写的

public class MyWorker : BackgroundService
{
protected override async Task ExcecuteAsync(CancellationToken stoppingToken)
{
var _factory = new ConnectionFactory() {
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
var _connection = _factory.CreateConnection();
var _channel = _connection.CreateModel();
_factory.DispatchConsumersAsync = true;
var _consumer = new AsyncEventingBasicConsumer(_channel);
_consumer.Received += async (sender, ea) => {
// Code to handle queue item here
_channel.BasicAck(ea.DeliveryTag,true);
await Task.Yield();
};
var tag = _consumer.BasicConsume("MyQueue",false,_consumer);
while(!stoppingToken.IsCancellationRequested){};
_consumer.BasicCancel(tag);
}

在服务中这样做是正确的吗?while循环会消耗CPU周期吗?

是的,您的代码将正常工作。我花了一点时间对它进行了扩展-

namespace rabbitmq_backgroundservice;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
public class Worker : BackgroundService
{
private const string _queueName = "MyQueue";
private readonly TimeSpan _stoppingCheckInterval = TimeSpan.FromSeconds(5);
private readonly ILogger<Worker> _logger;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly EventingBasicConsumer _consumer;
private readonly string _consumerTag;
public Worker(ILogger<Worker> logger)
{
_logger = logger;
var factory = new ConnectionFactory {
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(_queueName, true, true);
_consumer = new EventingBasicConsumer(_channel);
_consumer.Received += ReceivedHandler;
_consumerTag = _channel.BasicConsume(_queueName, false, _consumer);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using (_connection)
using (_channel)
{
while (!stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
await Task.Delay(_stoppingCheckInterval, stoppingToken);
}
_logger.LogInformation("Worker STOPPING at: {time}", DateTimeOffset.Now);
_channel.BasicCancel(_consumerTag);
}
}
private void ReceivedHandler(object? sender, BasicDeliverEventArgs ea)
{
var tag = ea.DeliveryTag;
_logger.LogInformation("Received message. tag: {tag}  at: {time}", tag, DateTimeOffset.Now);
_channel.BasicAck(tag, false);
}
}

存储库:https://github.com/lukebakken/rabbitmq-backgroundservice

我使用了这个参考:

https://learn.microsoft.com/en-us/dotnet/core/extensions/windows-service


注意:RabbitMQ团队监控rabbitmq-users邮件列表,有时只在StackOverflow上回答问题。

最新更新