订阅 MQTT 代理并获取正在传递的数据



我正在使用节点.js模拟器生成数据并将此数据传递给http路由/simulator/data

在应用程序中,我正在使用MQTT mqtthandler.js文件侦听代理,我在下面分享。

//This is mqtthandler.js file
const mqtt = require("mqtt");
class MqttHandler {
constructor() {
this.mqttClient = null;
this.host = "mqtt://localhost:1883";
this.username = "YOUR_USER"; // mqtt credentials if these are needed to connect
this.password = "YOUR_PASSWORD";
}
connect() {
// Connect mqtt with credentials (in case of needed, otherwise we can omit 2nd param)
this.mqttClient = mqtt.connect(this.host, {
username: this.username,
password: this.password,
});
// Mqtt error calback
this.mqttClient.on("error", (err) => {
console.log(err);
this.mqttClient.end();
});
// Connection callback
this.mqttClient.on("connect", () => {
console.log(`mqtt client connected`);
});
// mqtt subscriptions
this.mqttClient.subscribe("value", { qos: 0 });
// When a message arrives, console.log it
this.mqttClient.on("message", function (topic, message) {
console.log(message.toString());
});
this.mqttClient.on("close", () => {
console.log(`mqtt client disconnected`);
});
}
// Sends a mqtt message to topic: mytopic
sendMessage(message) {
this.mqttClient.publish("value", message);
}
}
module.exports = MqttHandler;

当模拟器将数据发送到/simulator/data路由时,我正在获取值并向带有值主题的代理发送。我在下面分享模拟器的后请求代码和输出。

var mqttHandler = require("../mqtthandler");
module.exports = function (app) {
app.get("/simulator", function (req, res) {
res.render("iot/simulator");
});
// route to display all the data that is generated
app.get("/simulator/data", require("./controllers/data").all);
var mqttClient = new mqttHandler();
mqttClient.connect();
// route to write data to the database
app.post(
"/simulator/data",
require("./controllers/data").write,
(req, res) => {
mqttClient.sendMessage(req.body.value);
res.status(200).send("Message sent to mqtt");
}
);
// delete the data when the stream is stopped or when the app is closed
app.get("/simulator/data/delete", require("./controllers/data").delete);
};

当我向/simulator/data发送 get 请求时,我能够看到生成的数据,但是这些数据没有发送到代理。

//This is output of simulator
[
{
"_id": "5ecfadc13cb66f10e4d9d39b",
"value": "1.886768240197795",
"__v": 0,
"categories": []
},
{
"_id": "5ecfadc23cb66f10e4d9d39c",
"value": "7.351404601932272",
"__v": 0,
"categories": []
}
]

PS:代理是通过红色节点创建的

我想将此数据传递给代理并查看 MQTT 订阅的结果。但是我找不到我在哪里犯了错误。

您的解决方案是修复您的开发过程。与其从故障调试 2 个子系统(您的发布者/模拟器和订阅者(开始工作,不如从成功开始工作:

1(使用您知道有效的发布商,例如。mosquitto_pub,任何有效的模拟器等。

2(使用您知道工作的订阅者,例如。mosquitto_sub

这将在几分钟内解决您的问题,而不是几小时或几天,并让您专注于您真正想要开发的代码。

所以这里有几件事要看。

  1. 您的this.mqttClient.subscribe()通话与您的connect()on.("error",...)on.("message",...)等一致。 所以订阅((可以在连接((完成之前触发...因此,您将永远不会订阅。 将subscribe()放入connect()块内。

  2. 您正在订阅"价值",这不是一个正确的 MQTT 主题。 如果必须,请使用value/#表示 subscribe((,使用 "value/.." 表示 publish((。 您的类只允许单个硬编码主题,因此当您想将该类重用于其他项目时,它不是很有用。 现在花点时间将主题字符串传递给类。

最新更新