我在VS2019中运行Windows服务项目,使用Mqtt 4.1.4 .Net
框架客户端。
我的。net客户端一遍又一遍地保存Connecting and Disconnecting
。
我最近发现我的OnSubscriberDisconnected
方法正在传递以下args
值:
args.Reason = SessionTakenOver
args.ReasonString = "Another client connected with the same client id."
最初,我在每个HiveMq Broker连接(免费云版本)上创建了一个新的随机ClientID,但我将其更改为:
clientId = ".netWinSvc-" + this.machineName;
这样,运行我的Win Service代码的机器将始终使用相同的ClientID连接。
我相信我堆积了很多新的ClientID连接,并要求代理持久化会话(CleanSession = false
)。免费的云订阅允许100 device connections
。
问题是:我该如何清理所有这些clientID连接,以及如何避免此断开/重新连接问题?重用相同的ClientID与CleanSession = false
是最好的方法吗?换句话说,我不应该要求代理persist
我的ClientID连接吗?
这是我的。net窗口服务代码的一部分:
using log4net.Ext.EventID;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Timer = System.Timers.Timer;
using MqttNotificationService.Models;
using Newtonsoft.Json;
namespace MqttNotificationService
{
public partial class MqttService : ServiceBase
{
public static readonly IEventIDLog applog = EventIDLogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private IManagedMqttClient managedMqttClientPublisher;
private IManagedMqttClient managedMqttClientSubscriber;
private string mqttClientUser = "";
private byte[] mqttClientPswd;
private string mqttBrokerAddress;
private string mqttProtocol = "wss";
private int? mqttPort;
private string defaultMessage;
private string topicThisHost = ""; // get topic from app.config
private string heartBeatPubMsg;
private double heartBeatTimerMs;
public MqttService()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
Init();
CreateThreadAndRun();
StartHeartBeatTimer();
}
private void Init()
{
log4net.Config.XmlConfigurator.Configure();
mqttClientUser = ConfigurationManager.AppSettings["MqttClientUser"];
mqttBrokerAddress = ConfigurationManager.AppSettings["MqttBrokerAddress"];
mqttProtocol = ConfigurationManager.AppSettings["MqttProtocol"];
mqttPort = Int16.Parse(ConfigurationManager.AppSettings["MqttPort"]);
MqttUseTls = bool.Parse(ConfigurationManager.AppSettings["UseTlsCertificate"]);
var MqttQos = Int16.Parse(ConfigurationManager.AppSettings["QualityOfService"]);
mqttRetained = bool.Parse(ConfigurationManager.AppSettings["MqttRetained"]);
mqttLastWillRetained = bool.Parse(ConfigurationManager.AppSettings["MqttLastWillRetained"]);
mqttLastWillMessage = ConfigurationManager.AppSettings["MqttLastWillMessage"];
mqttKeepAliveSeconds = Int16.Parse(ConfigurationManager.AppSettings["MqttLastWillKeepAliveSeconds"]);
CertificateFileName = ConfigurationManager.AppSettings["CertificateFileName"];
CertificatePwd = ConfigurationManager.AppSettings["CertificatePswd"];
defaultMessage = ConfigurationManager.AppSettings["DefaultPubMessage"];
topicSubFromHar = ConfigurationManager.AppSettings["MqttTopicSubFromHar"];
topicThisHost = ConfigurationManager.AppSettings["MqttTopicThisHost"];
heartBeatPubMsg = ConfigurationManager.AppSettings["HeartBeatPubMessage"];
heartBeatTimerMs = Double.Parse(ConfigurationManager.AppSettings["HeartBeatTimerMs"]);
pingDicom = bool.Parse(ConfigurationManager.AppSettings["CheckDicomServers"]);
SynergyHostName = ConfigurationManager.AppSettings["SynergyHostName"];
machineName = Dns.GetHostName();
hostIp = Dns.GetHostEntry(machineName)
.AddressList
.FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork)
.ToString();
clientId = ".netWinSvc-" + this.machineName;
QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
switch (MqttQos)
{
case 0:
QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
break;
case 1:
QosThisHost = MqttQualityOfServiceLevel.AtMostOnce;
break;
case 2:
QosThisHost = MqttQualityOfServiceLevel.ExactlyOnce;
break;
}
}
public void CreateThreadAndRun()
{
Thread m_Thread = new Thread(new ThreadStart(StartPublisherAndSubscriber));
m_Thread.SetApartmentState(ApartmentState.STA);
m_Thread.Name = "MT";
m_Thread.Priority = ThreadPriority.Highest;
m_Thread.Start();
}
private void StartPublisherAndSubscriber()
{
StartSubscriber();
_ = StartPublisher();
CheckOtherServers();
}
private void StartHeartBeatTimer()
{
TimeSpan ts = new TimeSpan(0, 0, 5);
Thread.Sleep(ts);
Timer timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(PublishHeartBeat);
timer.Interval = heartBeatTimerMs;
timer.Enabled = true;
}
private void PublishHeartBeat(object source, ElapsedEventArgs e)
{
var message = $"{ this.heartBeatPubMsg}: { MyHostName} {hostIp}";
_ = this.Publish(message, topicThisHost);
this.CheckOtherServers();
}
private async void StartSubscriber()
{
applog.Debug($"In StartSubscriber()");
var mqttFactory = new MQTTnet.MqttFactory();
managedMqttClientSubscriber = mqttFactory.CreateManagedMqttClient();
managedMqttClientSubscriber.ConnectedAsync += OnSubscriberConnected;
managedMqttClientSubscriber.DisconnectedAsync += OnSubscriberDisconnected;
managedMqttClientSubscriber.ApplicationMessageReceivedAsync += this.OnSubscriberMessageReceived;
// If tls is enabled in app.config, we use wss with cert file
if (MqttUseTls)
{
var managedClientOptions = WsSecureClientOptions();
await managedMqttClientSubscriber.StartAsync(managedClientOptions);
}
else
{
var insecureOptions = WsInsecureOptions();
await this.managedMqttClientSubscriber.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = insecureOptions
});
}
List<MqttTopicFilter> topicFilter = new List<MqttTopicFilter>();
topicFilter.Add(new MqttTopicFilter { Topic = topicThisHost });
topicFilter.Add(new MqttTopicFilter { Topic = topicSubFromHar });
Console.WriteLine("We have subscribed to multiple !");
await this.managedMqttClientSubscriber.SubscribeAsync(topicFilter);
}
public async Task StartPublisher()
{
var mqttFactory = new MqttFactory();
this.managedMqttClientPublisher = mqttFactory.CreateManagedMqttClient();
// If tls is enabled in app.config, we use wss with cert file
if (MqttUseTls)
{
var managedClientOptions = WsSecureClientOptions();
managedClientOptions.AutoReconnectDelay = TimeSpan.FromSeconds(10);
await this.managedMqttClientPublisher.StartAsync(managedClientOptions);
}
else
{
var insecureOptions = WsInsecureOptions();
await this.managedMqttClientPublisher.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = insecureOptions
});
}
applog.Debug($"In StartPublisher()");
await Publish($"{defaultMessage} - Machine: {this.machineName}, Host: {this.SynergyHostName}", this.topicThisHost);
}
public async Task Publish(string messageIn, string topic, IManagedMqttClient pubClient = null)
{
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce;
switch (MqttQos)
{
case 0:
qos = MqttQualityOfServiceLevel.AtLeastOnce;
break;
case 1:
qos = MqttQualityOfServiceLevel.AtMostOnce;
break;
case 2:
qos = MqttQualityOfServiceLevel.ExactlyOnce;
break;
}
MqttModel message = new MqttModel();
message.message = messageIn;
message.datestamp = DateTime.Now;
message.source = "";
message.status = "";
var payload = JsonConvert.SerializeObject(message, Formatting.Indented);
var send = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(false)
.Build();
if (this.managedMqttClientPublisher == null)
{
this.managedMqttClientPublisher = pubClient;
}
if (this.managedMqttClientPublisher != null)
{
try
{
applog.Debug($"Mqtt Service Publish() method - about to pub mqtt message EnqueueAsync() - {messageIn} / {topic} ");
await this.managedMqttClientPublisher.EnqueueAsync(send);
MonitoringLogs logs = new MonitoringLogs();
logs.InsertIntoLog(message);
}
catch (Exception ex)
{
string errorMessage = $"Exception occured in Publish() method. {ex.Message}";
applog.Error(errorMessage);
throw new Exception(errorMessage);
}
}
else
{
applog.Info($"Mqtt Service Publish() method - managedMqttClientPublisher object appears to be NULL");
}
}
public ManagedMqttClientOptions WsSecureClientOptions()
{
string assemblyPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(MqttService)).CodeBase);
// Building out the secure wss url (both pfx/crt certificate file types appear to work here)
var url = $"{mqttBrokerAddress}:{mqttPort}/mqtt";
X509Certificate2 x509Cert = null;
var file = CertificateFileName;
var filePath = Path.Combine(assemblyPath, file).Remove(0, 6);
// pfx file contains both pub and priv keys (needs pswd); crt file only has pub key (no pswd req'd)
if (Path.GetExtension(CertificateFileName.ToLower()) == ".pfx")
{
// using a PFX cert file via the X509 class
x509Cert = new X509Certificate2(filePath, CertificatePwd);
}
else if (Path.GetExtension(CertificateFileName.ToLower()) == ".crt")
{
x509Cert = new X509Certificate2(filePath);
}
applog.Debug($"In WsSecureClientOptions(), Certificate Path - {filePath}");
var clientOptionsBldr = new MqttClientOptionsBuilder()
.WithProtocolVersion(MqttProtocolVersion.V500)
.WithWebSocketServer(url)
.WithCredentials(mqttClientUser, mqttClientPswd)
.WithClientId(clientId)
.WithCleanSession()
.WithCredentials(mqttClientUser, mqttClientPswd)
.WithTls(
new MqttClientOptionsBuilderTlsParameters()
{
UseTls = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
Certificates = new List<X509Certificate2>() { x509Cert }
});
ManagedMqttClientOptions managedClientOptions = null;
try
{
applog.Debug($"In WsSecureClientOptions(), about to Build Publisher - ${url}");
managedClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(clientOptionsBldr)
.Build();
}
catch (Exception ex)
{
applog.Error("CERT ERROR ! Exception in WsSecureClientOptions() " + ex.Message);
}
return managedClientOptions;
}
private Task OnSubscriberConnected(MqttClientConnectedEventArgs _)
{
return Task.CompletedTask;
}
private Task OnSubscriberDisconnected(MqttClientDisconnectedEventArgs _)
{
return Task.CompletedTask;
}
}
根据注释,您的代码正在创建到代理的两个连接;
StartSubscriber()
StartPublisher()
两个函数最终都使用相同的客户端id创建到代理的连接(因为它们都使用相同的WsSecureClientOptions()
/WsInsecureOptions()
)。
MQTT-3.1.4-2状态:
如果客户端已经连接到服务器,那么服务器必须断开现有的客户端。
所以这两个连接将以"争斗"告终。-一个会连接,导致另一个断开连接,它会尝试重新连接等
要解决这个问题:
- 使用一个连接(订阅/发布不需要使用单独的连接)
- 使用不同的客户端ID