RabbitMQ 发布/订阅:关闭最后一个使用者将关闭发布者的频道/模型。为什么?



我正在使用.Net RabbitMQ作为一些发布/订阅(发布者/订阅者)代码。 在我开始关闭消费者之前,一切都很好。 使用者正确处理已发布的数据,直到我关闭最后一个使用者。 毕竟是消费者,我打开了一个新的消费者,但什么也没发生。 应用程序将打开,但不会从发布者接收任何数据。

我检查了发布者代码,发现当最后一个使用者关闭时,其频道的 IsOpen 属性变为 false。 我不知道是否有一些设置可以保持频道打开,即使它的消费者关闭了。

这是我的发布者代码:编辑我最初粘贴了错误的代码。

这是我的消费者代码:

public MyConsumer
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly Timer _timer;
private SubscriptionConsumerType(string ipAddress, string exchangeName, TimeSpan tsPullCycle)
{
    //set up connection
    this._factory = new ConnectionFactory();
    this._factory.HostName = ipAddress;
    this._connection = this._factory.CreateConnection();
    this._channel = this._connection.CreateModel();
    //set up and bind the exchange
    this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>());
    string queueName = this._channel.QueueDeclare().QueueName;
    this._channel.QueueBind(queueName, exchangeName, "");
    //start consuming
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(this._channel);
    this._channel.BasicConsume(queueName, true, consumer);
    //periodically check for new messages from the publisher
    this._timer = new Timer(new TimerCallback(this.TimerStep), consumer, tsPullCycle, tsPullCycle);
}
public void Dispose()
{
    if (this._timer != null)
        this._timer.Dispose();
    if (this._channel != null)
    {
        this._channel.Close();
        this._channel.Dispose();
    }
    if (this._connection != null)
    {
        this._connection.Close();
        this._connection.Dispose();
    }
}
}

现在,我的解决方法是始终在某个地方打开一个消费者窗口。 理想情况下,我希望我的发布者无论打开多少个使用者窗口都能运行。 谢谢。

编辑 哎呀,我粘贴了错误的生产者代码。 这是:

private SubscriptionBroadcastType(string ipAddress, string exchangeName)
{
    this._factory = new ConnectionFactory();
    this._factory.HostName = ipAddress;
    this._connection = this._factory.CreateConnection();
    this._channel = this._connection.CreateModel();
    this._exchangeName = exchangeName;
    this._channel.ExchangeDeclare(exchangeName, SubscriptionBroadcastType.BROADCAST, SubscriptionBroadcastType.DURABLE, SubscriptionBroadcastType.AUTO_DELETE, new Dictionary<string, object>());
}
public void BroadcastMessage(string message)
{
    lock (this._syncroot) //protect _channel
    {
        if (this._channel.IsOpen)
            this._channel.BasicPublish(this._exchangeName, "", null, System.Text.Encoding.UTF8.GetBytes(message));
    }
}

我认为你可能在这里有一些根本性的问题。 请检查您是否发布了正确的代码。 正如我所读到的,您有一个生产者创建了一个特定的命名队列并直接发布到队列中。 您有一个消费者创建一个特定的命名交易所,然后创建一个动态命名的新队列并将其绑定到交易所。 然后,您将从此队列中读取,但它不能是您最初发布到的队列。

我会首先修复您的代码,以在您的发布者代码中添加创建交换,该交换具有您在使用者代码中有权访问的特定名称。 此行将出现在生产者线程中,而不是队列声明行中:

this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>());

然后,您需要发布到该交易所,以便将其更改为:

this._channel.BasicPublish(exchangeName, "", this._basicProperties, System.Text.Encoding.UTF8.GetBytes(message));

您的使用者应该被很好地设置为按原样从队列接收这些消息。 看看这是否有助于解决您的问题。

最新更新