我在实现 ACK 以获得消息是使用 MQTT 传递的反馈时遇到问题。我的概念在于在发送方发送的消息中提供一个 ID,以便接收方在不同的通道上发回具有相同 id 的 ACK。现在发生的问题是,当我收到确认时,我无法中断侦听事件。
到目前为止,我的代码是
let mqtt = require('async-mqtt')
, cfg = require('./cfg');
let client = mqtt.connect(cfg.server);
client.subscribe('some/other/topic');
client.on('connect', sendWithAck)
let id = 123;
async function sendWithAck() {
try {
await client.publish('some/topic', `Message with id${id}`, () => {
client.on('message', (topic, msg) => {
console.log(`${topic}> ${msg.toString()}`);
//this.stopPropagation(); //doesn't work
})
});
await client.end();
console.log('done');
} catch(e) {
console.log('error', e);
process.exit();
}
}
这种方法是行不通的,因为如果另一端从未响应(例如崩溃)会发生什么。在 MQTT 协议级别无法知道这一点,这与 HTTP 等同步协议不同。
正确的方法是在调用subscribe
之前设置on('message')
侦听器,并使用状态机记录已发送消息的 ID,并在响应传入时将其删除。通过这种方式,您可以设置一个计时器以允许响应超时并适当地处理它们。