这是我的冰箱控制器:
const fs = require('fs');
const mqtt = require('mqtt');
const transporter = require('../params/mail')
const winston = require('../params/log');
const User = require("../models/User");
const { cli } = require('winston/lib/winston/config');
exports.OpenTheCase = async (req, res) => {};
exports.AddCard = async (req, res) => {};
exports.ShowCurrentTemperature = async (req, res) => {};
exports.ShowCurrentHumidity = async (req, res) => {};
exports.SetAlarm = async (req, res) => {};
exports.AlarmIsOn = async (req, res) => {};
const options = {
clientId: 'backendserver1032',
key: fs.readFileSync('./certs/mqtt_cert/client.key'),
cert: fs.readFileSync('./certs/mqtt_cert/client.crt'),
ca: [ fs.readFileSync('./certs/mqtt_cert/ca.crt') ]
}
const client = mqtt.connect('mqtts://localhost:8883', options);
exports.OpenTheCase = async (req, res) => {
try {
client.publish('RFID', 'RFID_OPEN');
res.status(200).json({ 'case':"opened" });
}
catch(e){
res.status(200).json({ 'state':"something went wrong" });
}
}
exports.AddCard = async (req, res) => {
try {
client.publish('RFID', 'RFID_ADD');
res.status(200).json({ 'card':"will be added" });
}
catch(e){
res.status(200).json({ 'state':"something went wrong" });
}
}
exports.ShowCurrentTemperature = async (req, res) => {
try {
client.subscribe('temperature');
client.on('message', (topic, message, packet) => {
res.status(200).json({ 'temperature': message.toString('ascii') })
client.unsubscribe('temperature')
})
}
catch(e){
res.status(200).json({ 'state':"something went wrong" });
}
return
}
exports.ShowCurrentHumidity = async (req, res) => {
try {
client.subscribe('humidity');
client.on('message', (topic, message) => {
res.status(200).json({"temperature": message.toString('ascii')});
client.unsubscribe('humidity')
});
}
catch(e){
res.status(200).json({ 'state':"something went wrong" });
}
return
}
问题是:当我试图得到ShowCurrentTemperature",它的工作一次和之后。它表示http报头已经发送。
这是我的路线:
router.get("/frigo/Temperature",auth.verifyToken, frigoController.ShowCurrentTemperature)
我真的很感谢你。
我尝试了几件事,比如添加返回或试图结束连接,但它们都不起作用。我有点不知所措了。如果有人能帮我度过这个难关。
这个设计是不会成功的。
你基本上泄露了client.on('message',...)
处理程序。每次对HTTP端点的调用都会添加一个新的处理程序,该处理程序保存对其本地res
对象的引用。
调用unsubscribe()
不会删除消息处理程序,因此当您在HTTP路由中再次调用subscribe()
时,下一个到达的消息将被传递给所有旧处理程序,这些处理程序将在已经完成的HTTP事务上尝试调用res.send()
。
您正在尝试将固有异步协议(MQTT)映射到同步HTTP请求,这是一个非常糟糕的主意。
你可能能够通过交换所有的client.on('message', ...)
调用到client.once('message', ....)
,使处理程序只触发一次,但这是一个真正的UGLY黑客。(编辑:反思,这仍然有一个竞争条件,你可能最终处理错误的消息,如果两个HTTP端点被称为太接近在一起,所以我坚持我的第一个声明,这种设计永远不能正常工作)
正确的做法是完全独立于HTTP请求运行整个MQTT客户端,并且只让单个后台client.on('message',...)
处理程序用HTTP路由可以返回的最新值更新一些全局变量,而不是尝试抓取下一个发布的消息。
简而言之,当您尝试使用res
发送多个响应时,就会发生此错误。当client.unsubscribe('temperature')
期间出现异常时,可能会在代码中发生这种情况,因为这时执行将进入发送另一个响应的catch
。您可以通过将unsubscribe
移动到try-catch:
exports.ShowCurrentTemperature = async(req, res) => {
try {
client.subscribe('temperature');
client.on('message', (topic, message, packet) => {
res.status(200).json({
'temperature': message.toString('ascii')
})
})
} catch (e) {
res.status(200).json({
'state': "something went wrong"
});
}
client.unsubscribe('temperature')
}
更新:实际上,更可能的解释是,您在取消订阅之前收到了不止一条消息,因此第一个res..
被执行了多次。所以最好在回复之前退订。由于同时可能会有更多处理程序排队,因此您可能还需要添加一个保护,以确保永远不会发送多个响应:
exports.ShowCurrentTemperature = async(req, res) => {
let done = false;
try {
client.subscribe('temperature');
client.on('message', (topic, message, packet) => {
client.unsubscribe('temperature')
!done && res.status(200).json({
'temperature': message.toString('ascii')
})
done = true;
})
} catch (e) {
!done && res.status(200).json({
'state': "something went wrong"
});
}
}
顺便说一句,您也可以直接在客户端上使用mqtt,如果从授权的角度来看可以的话,这样会更优雅。