使用多个 MQTT 节点红色节点的行为不符合预期



我有多个 MQTT 节点,其中配置了不同的主题。现在我将处理多个主题的值并找出一些假设(基本上是流分析)。

我的期望:

我知道java脚本是单线程的。所以我认为当收到一个主题数据时,它将被处理,然后只有在完成后才会收到其他主题,依此类推。

现实:

它像多线程一样工作。

测试用例:

流程:MQTT ---> 第二个--->输出进程

睡眠函数代码(不是真的睡眠更像是处理):

var start = new Date().getTime();
for (var i = 0; i < 1e7; i++)
{
if ((new Date().getTime() - start) > 1000)
{
break;
}
}
return msg;

现在,我将使用 for 循环连续发布数据形式 1 到 100。

我的期望:

现在 1、2、3....100 将以 1 秒的间隔一个接一个地显示。因此,现在大约需要 100 秒才能显示从 1 到 100 的值。

现实:

首先,它将休眠 100 秒,然后形成 1 到 100 所有内容将同时显示。那么这里发生了什么?

流 json:

[{"id":"e9a53835.09af38","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"5ffb1b40.1405b4","type":"debug","z":"e9a53835.09af38","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":438,"y":216,"wires":[]},{"id":"a8406277.a78ee","type":"mqtt in","z":"e9a53835.09af38","name":"Test MQTT Queue","topic":"1","qos":"2","broker":"b4c58fab.26844","x":146,"y":120,"wires":[["629e90bb.996ad"]]},{"id":"629e90bb.996ad","type":"function","z":"e9a53835.09af38","name":"Sleep 1 seconds","func":"var start = new Date().getTime();nfor (var i = 0; i < 1e7; i++)n{n    if ((new Date().getTime() - start) > 1000)n    {n      break;n    }n}nreturn msg;","outputs":1,"noerr":0,"x":298,"y":168,"wires":[["5ffb1b40.1405b4"]]},{"id":"b4c58fab.26844","type":"mqtt-broker","z":"","name":"","broker":"127.0.0.1","port":"1883","clientid":"","usetls":false,"compatmode":true,"keepalive":"60","cleansession":true,"willTopic":"","willQos":"2","willRetain":"false","willPayload":"","birthTopic":"","birthQos":"2","birthRetain":"false","birthPayload":""}]

C# 发布者函数:

// Retain: false, QOS= 2 on both publisher and client.
for (int i = 1; i <= 10; i++)
{
client.Publish(1, Encoding.UTF8.GetBytes(i.ToString()), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
}
  1. 第一条消息到达并传递给函数节点,然后函数节点执行繁忙等待循环,不允许节点.js事件循环处理任何其他工作。
  2. 在此期间,剩余的 99 条消息到达底层 MQTT 客户端,内部事件排队处理它们
  3. 然后,第一条消息最终到达"调试"节点。Debug 节点将消息异步传递到 websocket - 这意味着该部分工作被包装在一个事件中,并放在节点.js事件队列的末尾 - 在 99 条消息后面。
  4. 接下来的 99 个事件也会发生同样的情况 - 它们被同步处理,节点.js事件循环没有机会取得进展,每个事件都会在队列末尾添加另一个事件,以将消息传递给 Debug
  5. 处理最后一条消息,node.js 事件循环到达事件以通过 websocket 处理调试消息,所有 100 条消息都显示在"调试"侧栏中

这里的关键是,在节点.js世界中同步阻塞是一件坏事。如果要延迟消息,请使用 Delay 节点,该节点使用计时器执行此操作 - 从而允许 node.js 在后台继续处理其他工作。

最新更新