我在循环中有一个MQTT调用,在每次迭代中,它都应该从订阅者返回一个响应,这样我就可以使用发布后转发的值。但问题是我不知道该怎么做
我希望你有一个想法,或者如果我只是没有正确实施,请你指导我度过难关。谢谢
这是我的代码:
// MyClientMgr
class MyClientMgr{
public long CurrentOutput { get; set; }
public void GetCurrentOutput(MyObjectParameters parameters, MqttClient client)
{
MyMessageObject msg = new MyMessageObject
{
Action = MyEnum.GetOutput,
Data = JsonConvert.SerializeObject(parameters)
}
mq_GetCurrentOutput(msg, client);
}
private void mq_GetCurrentOutput(MyMessageObject msg, MqttClient client)
{
string msgStr = JsonConvert.SerializeObject(msg);
client.Publish("getOutput", Encoding.UTF8.GetBytes(msgStr),
MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
client.MqttMsgPublishReceived += (sender, e) =>{
MyObjectOutput output = JsonConvert.DeserializeObject<MyObjectOutput>(Encoding.UTF8.GetString(e.Message));
CurrentOutput = output;
};
}
}
// MyServerMgr
class MyServerMgr
{
public void InitSubscriptions()
{
mq_GetOutput();
}
private void mq_GetOutput()
{
MqttClient clientSubscribe = new MqttClient(host);
string clientId = Guid.NewGuid().ToString();
clientSubscribe.Connect(clientId);
clientSubscribe.Subscribe(new string[] { "getOutput" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
MqttClient clientPublish = new MqttClient(host);
string clientIdPub = Guid.NewGuid().ToString();
clientPublish.Connect(clientIdPub);
clientSubscribe.MqttMsgPublishReceived += (sender, e) => {
MyMessageObj msg = JsonConvert.DeserializeObject<MyMessageObj>(Encoding.UTF8.GetString(e.Message));
var output = msg.Output;
clientPublish.Publish("getOutput", Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(output)), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
}
}
}
// MyCallerClass
class MyCallerClass
{
var host = "test.mqtt.org";
var myServer = new MyServerMgr(host);
var myClient = new MyClientMgr();
myServer.InitSubscriptions();
MqttClient client = new MqttClient(host);
for(int i = 0; i < 10; i++)
{
long output = 0;
MyObjectParameters parameters = {};
myClient.GetCurrentOutput(parameters, client) // here I call the method from my client manager
// to publish the getting of the output and assigned
// below for use, but the problem is the value doesn't
// being passed to the output variable because it is not
// yet returned by the server.
// Is there a way I could wait the process to
// get the response before assigning the output?
output = myClient.CurrentOutput; // output here will always be null
// because the response is not yet forwarded by the server
}
}
我的调用者类中有一个循环,用于调用mqtt发布以获取输出,但我不知道如何在分配输出之前获取输出,我想先等待响应,然后再进行下一个。
我已经试过这样在里面循环一段时间了:
while(output == 0)
{
output = myClient.CurrentOutput;
}
是的,我可以在这里得到输出,但这会大大减慢过程。有时它会失败。
请帮帮我。谢谢。
看起来您正在尝试通过异步协议(MQTT)进行同步通信。
我的意思是,您希望发送一条消息,然后等待响应,这不是MQTT的工作方式,因为在协议级别没有对消息的回复的概念。
我对C#不太熟悉,所以我只对可能的解决方案进行抽象描述。
我的建议是使用发布线程wait/pulse(查看Monitor类)在每次发布后都有这个块,并在收到响应时让消息处理程序调用pulse。
如果响应不包含识别原始请求的等待,则还需要一个状态机变量来记录正在进行的请求。
你可能想考虑暂停等待,以防对方因某些原因没有回应。
您可以使用具有WaitOne()和Set()方法的AutoResetEvent类。发布后使用WaitOne()将等待消息发布,在client_MqttMsgPublishReceived事件下使用Set()将在订阅者收到他订阅的消息时释放等待。