丢失带有C#uPLibrary.Networking.M2Mqtt的MQTT的消息



我有一个问题,我用MQTT丢失了消息,尽管我用"QOS_LEVEL_EXACTLY_ONCE";。

只有当接收器没有运行,然后稍后启动时,才会出现损失。然后不收集这些消息。

M2Mqtt版本为4.3.0

如果两个客户端(即接收器和发送器(都在运行,则不会丢失任何消息。

只有当接收器没有运行时,消息才会在这段时间内被预取,并且不会到达接收器。

我在服务器(代理(上找不到应该保存多长时间的消息的任何设置

发送器

public class Programm
{
static MqttClient mqttClient;
static async Task Main(string[] args)
{
var locahlost = true;
var clientName = "Sender 1";
Console.WriteLine($"{clientName} Startet");
var servr = locahlost ? "localhost" : "test.mosquitto.org";
mqttClient = new MqttClient(servr);
mqttClient.Connect(clientName);
Task.Run(() =>
{
if (mqttClient != null && mqttClient.IsConnected)
{
for (int i = 0; i < 100; i++)
{
var Message = $"{clientName} ->Test {i}";
mqttClient.Publish("Application1/NEW_Message", Encoding.UTF8.GetBytes($"{Message}"), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, true);
Console.WriteLine(Message);
Thread.Sleep(i * 1000);
}
}
});
Console.WriteLine($"{clientName} End");
}
}

服务器

public class Programm
{

static async Task Main(string[] args)
{
Console.WriteLine("Server");
MqttServerOptionsBuilder options = new MqttServerOptionsBuilder()
// set endpoint to localhost
.WithDefaultEndpoint()
// port used will be 707
.WithDefaultEndpointPort(1883);
// handler for new connections

// creates a new mqtt server     
IMqttServer mqttServer = new MqttFactory().CreateMqttServer();
// start the server with options  
mqttServer.StartAsync(options.Build()).GetAwaiter().GetResult();
// keep application running until user press a key
Console.ReadLine();
}
}

接收器

public class Programm
{
static MqttClient mqttClient;

static async Task Main(string[] args)
{
var clientName = "Emfänger 1";
var locahlost = true;
Console.WriteLine($"Start of {clientName}");

Task.Run(() =>
{
var servr = locahlost ? "localhost" : "test.mosquitto.org";
mqttClient = new MqttClient(servr);
mqttClient.MqttMsgPublishReceived += MqttClient_MqttMsgPublishReceived;
mqttClient.Subscribe(new string[] { "Application1/NEW_Message" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
mqttClient.Connect(clientName);
});

//  client.UseConnecedHandler(e=> {Console.WriteLine("Verbunden") });
Console.ReadLine();
Console.WriteLine($"end of  {clientName}");
Console.ReadLine();

}
private static void MqttClient_MqttMsgPublishReceived(object sender, uPLibrary.Networking.M2Mqtt.Messages.MqttMsgPublishEventArgs e)
{
var message = Encoding.UTF8.GetString(e.Message);
Console.WriteLine(message);
}

}

使用M2MQTT连接到代理时,Clean会话标志的默认值为true。

这意味着代理将丢弃任何排队的消息。

https://m2mqtt.wordpress.com/using-mqttclient/

您需要将其设置为false,以确保客户端接收排队的消息。

mqttClient.Connect(clientName, false);

我发现了错误,缺少保存。

这是来自服务器的新代码

static async Task Main(string[] args)
{
Console.WriteLine("Server");
MqttServerOptionsBuilder options = new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithDefaultEndpointPort(1883)
.WithConnectionValidator(OnNewConnection)
.WithApplicationMessageInterceptor(OnNewMessage)
.WithClientMessageQueueInterceptor(OnOut)
.WithDefaultCommunicationTimeout(TimeSpan.FromMinutes(5))
.WithMaxPendingMessagesPerClient(10)
.WithPersistentSessions()
.WithStorage(storage);
// creates a new mqtt server     
IMqttServer mqttServer = new MqttFactory().CreateMqttServer();
// start the server with options  
mqttServer.StartAsync(options.Build()).GetAwaiter().GetResult();
// keep application running until user press a key
Console.ReadLine();
}

最新更新