MqttNet-MQTT客户端无法接收MQTT服务器发布的所有消息



当客户端订阅主题时,我从MQTT服务器发布1000条消息。MQTT客户端没有接收到某些消息。这是一个错误还是我实现了一些错误?

以下是服务器和客户端配置示例。

服务器配置

using MQTTnet;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MqttNetServerExample
{
class Program
{
private static IMqttServer _mqttServer;
static void Main(string[] args)
{
// Configure MQTT server.
var optionsBuilder = new MqttServerOptionsBuilder().WithConnectionBacklog(100)
.WithDefaultEndpointPort(1884);
// Define a mqttServer
_mqttServer = new MqttFactory().CreateMqttServer();
// Message arrived configuration
_mqttServer.UseApplicationMessageReceivedHandler(async e =>
{
if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload)== "Test Message")
{
Console.WriteLine("subscription message received");
Console.WriteLine("Simulating messages...");
await Simulate();
}
});
// When a new client connected
_mqttServer.UseClientConnectedHandler(e =>
{
Console.WriteLine("***** CLIENT CONNECTED : " + e.ClientId + " *******");
});
// Start the mqtt server
_mqttServer.StartAsync(optionsBuilder.Build());
Console.ReadLine();
}
private static async Task PublishMessage(string message)
{
// Create mqttMessage
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic("mqttServerTopic")
.WithPayload(message)
.WithAtLeastOnceQoS()
.WithRetainFlag(false)
.WithDupFlag(false)
.Build();
// Publish the message asynchronously
var result = await _mqttServer.PublishAsync(mqttMessage, CancellationToken.None);
if(result.ReasonCode == MQTTnet.Client.Publishing.MqttClientPublishReasonCode.Success)
Console.WriteLine("Message published : " + message);
}
private static async Task Simulate()
{
for (int i = 0; i < 1000; i++)
{
var message = "This is a message from server " + i.ToString();
await PublishMessage(message);
}
}
}
}

客户端配置

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Subscribing;
using System;
using System.Text;
using System.Threading;
namespace MqttNetClientExample
{
class Program
{
private static IMqttClient _mqttClient;
static void Main(string[] args)
{
// Create client
_mqttClient = new MqttFactory().CreateMqttClient();
var options = new MqttClientOptionsBuilder().WithClientId("MqttClient")
.WithTcpServer("localhost", 1884)
.Build();
// When client connected to the server
_mqttClient.UseConnectedHandler(async e =>
{
// Subscribe to a topic
MqttClientSubscribeResult subResult = await _mqttClient.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
 .WithTopicFilter("mqttServerTopic")
 .Build());
// Sen a test message to the server
PublishMessage("Test Message");
});
// When client received a message from server
_mqttClient.UseApplicationMessageReceivedHandler(e =>
{
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
});
// Connect ot server
_mqttClient.ConnectAsync(options, CancellationToken.None);
Console.Read();
}
private static async void PublishMessage(string message)
{
// Create mqttMessage
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic("mqttServerTopic")
.WithPayload(message)
.WithExactlyOnceQoS()
.Build();
// Publish the message asynchronously
await _mqttClient.PublishAsync(mqttMessage, CancellationToken.None);
}
}
}

结果输出示例

这是来自客户端应用程序的示例输出:

+ Payload = Test Message
+ Payload = This is a message from server 0
+ Payload = This is a message from server 1
+ Payload = This is a message from server 2
+ Payload = This is a message from server 3
+ Payload = This is a message from server 4
+ Payload = This is a message from server 5
+ Payload = This is a message from server 6
+ Payload = This is a message from server 7
+ Payload = This is a message from server 8
+ Payload = This is a message from server 9
+ Payload = This is a message from server 10
+ Payload = This is a message from server 11
+ Payload = This is a message from server 13
+ Payload = This is a message from server 14
+ Payload = This is a message from server 23
+ Payload = This is a message from server 53
+ Payload = This is a message from server 54
+ Payload = This is a message from server 55
+ Payload = This is a message from server 56
+ Payload = This is a message from server 57
+ Payload = This is a message from server 83
+ Payload = This is a message from server 105
+ Payload = This is a message from server 120
+ Payload = This is a message from server 138
+ Payload = This is a message from server 139
+ Payload = This is a message from server 140
+ Payload = This is a message from server 141
+ Payload = This is a message from server 172
+ Payload = This is a message from server 192
+ Payload = This is a message from server 207
+ Payload = This is a message from server 218
+ Payload = This is a message from server 236
+ Payload = This is a message from server 258
+ Payload = This is a message from server 278
+ Payload = This is a message from server 302
.
.
.
.
+ Payload = This is a message from server 999

正如您所看到的,有些消息丢失了。如何在不丢失的情况下获取所有消息?谢谢

根据评论,问题与MqttNet Broker的配置有关。

设置MaxPendingMessagesPerClient默认为250,这意味着当有250条消息等待为特定客户端传递时,另一条消息到达,队列中的第一条消息将被丢弃。

您的应用程序发送消息的速度快于接收方处理消息的速度,因此队列构建得很快,消息就会丢失。最后250条消息到达时没有问题,因为没有发送任何新消息(因此代理无需丢弃更多消息(。

要解决此问题,您可以使用WithMaxPendingMessagesPerClient,例如

var optionsBuilder = new MqttServerOptionsBuilder()                
.WithMaxPendingMessagesPerClient(1000);  

注意:生产中用于此设置的值将取决于您的具体要求。您需要确保订阅者可以按照消息的生成速率使用消息(否则代理队列将持续增长(。

最新更新