Node.js 和 MQTT - "level":"error","message":"uncaughtException:发送到客户端后无法设置标头



这是我的冰箱控制器:

const fs = require('fs');
const mqtt = require('mqtt');
const transporter = require('../params/mail')
const winston = require('../params/log');
const User = require("../models/User");
const { cli } = require('winston/lib/winston/config');
exports.OpenTheCase = async (req, res) => {};
exports.AddCard = async (req, res) => {};
exports.ShowCurrentTemperature = async (req, res) => {};
exports.ShowCurrentHumidity = async (req, res) => {};
exports.SetAlarm = async (req, res) => {};
exports.AlarmIsOn = async (req, res) => {};
const options = {
clientId: 'backendserver1032',
key: fs.readFileSync('./certs/mqtt_cert/client.key'),
cert: fs.readFileSync('./certs/mqtt_cert/client.crt'),
ca: [ fs.readFileSync('./certs/mqtt_cert/ca.crt') ]
}
const client = mqtt.connect('mqtts://localhost:8883', options);
exports.OpenTheCase = async (req, res) => {
try {
client.publish('RFID', 'RFID_OPEN');
res.status(200).json({ 'case':"opened" });
}
catch(e){
res.status(200).json({ 'state':"something went wrong" });
}
}
exports.AddCard = async (req, res) => {
try {
client.publish('RFID', 'RFID_ADD');
res.status(200).json({ 'card':"will be added" });
}
catch(e){
res.status(200).json({ 'state':"something went wrong" });
}
}
exports.ShowCurrentTemperature = async (req, res) => {
try {
client.subscribe('temperature');
client.on('message', (topic, message, packet) => {
res.status(200).json({ 'temperature': message.toString('ascii') })
client.unsubscribe('temperature')
})
}
catch(e){
res.status(200).json({ 'state':"something went wrong" });
}
return
}
exports.ShowCurrentHumidity = async (req, res) => {
try {
client.subscribe('humidity');
client.on('message', (topic, message) => {
res.status(200).json({"temperature": message.toString('ascii')});
client.unsubscribe('humidity')
});
}
catch(e){
res.status(200).json({ 'state':"something went wrong" });
}
return
}

问题是:当我试图得到ShowCurrentTemperature",它的工作一次和之后。它表示http报头已经发送。

这是我的路线:

router.get("/frigo/Temperature",auth.verifyToken, frigoController.ShowCurrentTemperature)

我真的很感谢你。

我尝试了几件事,比如添加返回或试图结束连接,但它们都不起作用。我有点不知所措了。如果有人能帮我度过这个难关。

这个设计是不会成功的。

你基本上泄露了client.on('message',...)处理程序。每次对HTTP端点的调用都会添加一个新的处理程序,该处理程序保存对其本地res对象的引用。

调用unsubscribe()不会删除消息处理程序,因此当您在HTTP路由中再次调用subscribe()时,下一个到达的消息将被传递给所有旧处理程序,这些处理程序将在已经完成的HTTP事务上尝试调用res.send()

您正在尝试将固有异步协议(MQTT)映射到同步HTTP请求,这是一个非常糟糕的主意。

可能能够通过交换所有的client.on('message', ...)调用到client.once('message', ....),使处理程序只触发一次,但这是一个真正的UGLY黑客。(编辑:反思,这仍然有一个竞争条件,你可能最终处理错误的消息,如果两个HTTP端点被称为太接近在一起,所以我坚持我的第一个声明,这种设计永远不能正常工作)

正确的做法是完全独立于HTTP请求运行整个MQTT客户端,并且只让单个后台client.on('message',...)处理程序用HTTP路由可以返回的最新值更新一些全局变量,而不是尝试抓取下一个发布的消息。

简而言之,当您尝试使用res发送多个响应时,就会发生此错误。当client.unsubscribe('temperature')期间出现异常时,可能会在代码中发生这种情况,因为这时执行将进入发送另一个响应的catch。您可以通过将unsubscribe移动到try-catch:

之后来避免这种情况。
exports.ShowCurrentTemperature = async(req, res) => {
try {
client.subscribe('temperature');
client.on('message', (topic, message, packet) => {
res.status(200).json({
'temperature': message.toString('ascii')
})
})
} catch (e) {
res.status(200).json({
'state': "something went wrong"
});
}
client.unsubscribe('temperature')
}
更新:

实际上,更可能的解释是,您在取消订阅之前收到了不止一条消息,因此第一个res..被执行了多次。所以最好在回复之前退订。由于同时可能会有更多处理程序排队,因此您可能还需要添加一个保护,以确保永远不会发送多个响应:

exports.ShowCurrentTemperature = async(req, res) => {
let done = false;
try {
client.subscribe('temperature');
client.on('message', (topic, message, packet) => {
client.unsubscribe('temperature')
!done && res.status(200).json({
'temperature': message.toString('ascii')
})
done = true;
})
} catch (e) {
!done && res.status(200).json({
'state': "something went wrong"
});
}
}

顺便说一句,您也可以直接在客户端上使用mqtt,如果从授权的角度来看可以的话,这样会更优雅。

最新更新