MqttNet版本4.1.3.563基本示例



按照这个示例,现在需要更新MQTT。. NET从版本3(感谢提供的帮助)到版本4。

一组非常基本的功能就足够了:

  1. 使用超时连接到一个地址
  2. 检查连接是否正常
  3. 检查断开

在版本3中非常容易

MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder()
.WithClientId("IoApp" + HelperN.MQTT.GetClientID(true))
.WithTcpServer("localhost", 1883);
ManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
.WithClientOptions(builder.Build())
.Build();
mqttClient = new MqttFactory().CreateManagedMqttClient();
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
mqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(OnConnectingFailed);
mqttClient.SubscribeAsync(...);
mqttClient.SubscribeAsync(...);
mqttClient.StartAsync(options).GetAwaiter().GetResult();
mqttClient.UseApplicationMessageReceivedHandler(args => { OnMessageReceived(args); });

但是当涉及到版本4时,如果我必须依靠这些例子,我就有问题了。让我们从连接

开始
public static async Task Connect_Client_Timeout()
{
/*
* This sample creates a simple MQTT client and connects to an invalid broker using a timeout.
* 
* This is a modified version of the sample _Connect_Client_! See other sample for more details.
*/
var mqttFactory = new MqttFactory();
strError = String.Empty;
using (var mqttClient = mqttFactory.CreateMqttClient())
{
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer("aaaa127.0.0.1",1883).Build();
try
{

using (var timeoutToken = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
await mqttClient.ConnectAsync(mqttClientOptions, timeoutToken.Token);
}
}
catch (OperationCanceledException exc)
{
strError = "Connect_Client_Timeout exc:" + exc.Message;
}
}
}

我从主程序中调用这个任务,等待结果。

var connectTask  = Connect_Client_Timeout();
connectTask.Wait();<-----never ends

因为我写错了地址"aaaa127.0.0.1"我估计5秒后会失败。但connectTask.Wait永远不会结束。但即使我输入正确的地址& 127.0.0.1"它永远不会退出。所以错误可能出现在connectTask.Wait();.

解决方案在这里简而言之,你必须这样做:

static async Task  Connect()
{
IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient();
// Create client options object
MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder()
.WithClientId("behroozbc")
.WithTcpServer("localhost");
ManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
.WithClientOptions(builder.Build())
.Build();

// Set up handlers
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;

_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;

_mqttClient.ConnectingFailedAsync += _mqttClient_ConnectingFailedAsync;

// Connect to the broker
await _mqttClient.StartAsync(options);
// Send a new message to the broker every second
while (true)
{
string json = JsonSerializer.Serialize(new { message = "Hi Mqtt", sent = DateTime.UtcNow });
await _mqttClient.EnqueueAsync("behroozbc.ir/topic/json", json);
await Task.Delay(TimeSpan.FromSeconds(1));
}
Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
Console.WriteLine("Connected");
return Task.CompletedTask;
};
Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
Console.WriteLine("Disconnected");
return Task.CompletedTask;
};
Task _mqttClient_ConnectingFailedAsync(ConnectingFailedEventArgs arg)
{
Console.WriteLine("Connection failed check network or broker!");
return Task.CompletedTask;
}
}

然后调用Connect()并依赖于订阅的示例

最新更新