引入消息之间的延迟



我使用Node.js。我有一个MQTT消息事件处理程序
index.js

client.on('message', function (topic, message) {
// calls another function
my_function(topic,message);
})

其在接收消息时调用另一个函数CCD_ 2。

async function my_function(topic,message) {
const value = await dataFromPLC();
///processes the value together with message
}

使用exports.dataFromPLC = dataFromPLC从另一个文件导出并导入到我的主函数中的函数dataFromPLC如下所示
PLCfunctions.js

let client = new S7Client(plcSettings);
client.on('error', console.error);

async function dataFromPLC (){
try {
await client.connect();
} catch (err){
console.error(err);
}

try {
// Read DB
const res = await client.readDB(dbNr, dbVars);
return res;
} catch (err) {
console.error(err);
} finally {
client.disconnect();
}
}

当我接收到单个MQTT消息或者消息之间有足够的延迟时,没有问题。然而,当我收到两条MQTT消息时,它们都调用my_function,随后调用dataFromPLC,其间没有太多延迟。我收到一个错误,因为在第二条消息尝试再次使用连接之前,没有足够的时间关闭PLC连接。我考虑过不同的选择,不太确定如何解决这个问题。我能得到一些帮助吗?

您必须设置一个消息队列,以便onMessage只将输入放入队列中,并将其处理推迟到以后。例如,您可以使用then作为入队操作使队列成为Promise。通过这种方式,可以保证在之前的所有处理都完成之前不会启动任何处理。

这里有一个小演示,点击按钮模拟传入消息:

let QUEUE = Promise.resolve()
function onMessage(msg) {
console.log('GOT MESSAGE', msg)
QUEUE = QUEUE.then(() => process(msg))
}
let pause = n => new Promise(r => setTimeout(r, n));
async function process(msg) {
console.log('BEGIN', msg)
await pause(200); console.log('busy', msg)
await pause(200); console.log('busy', msg)
await pause(200); console.log('busy', msg)
await pause(200); console.log('busy', msg)
await pause(200); console.log('busy', msg)
console.log('END', msg)
}
msg = 0
document.querySelector('button').addEventListener('click', () => onMessage(++msg))
<button>message</button>

最新更新