ZeroMQ+NodeJs:订阅服务器未接收到消息



我的发布者(zpub.js(在一个循环中发布,如下所示。

async function publishLoop() {
let payload = []
_.forEach(array, (a) => {
// process a here to generate someKey and someValue
payload.push({someKey:someValue})
})
return Promise.all(payload.map(async (p) => {
await zmqp.publish({t:'topicString', m:p})
}))
}

CCD_ 2简单地如下。

async publish(payload) {
// this.sock is just a bind to tcp://127.0.0.1:4030
await this.sock.send([payload.t, JSON.stringify(payload.m, null, null)])
return Promise.resolve()
}

我的订阅者(zsub.js(是ZeroMQ网站上的代码版本。

const zmq = require("zeromq")
const mom = require('moment-timezone')
async function run() {
const sock = new zmq.Subscriber
sock.connect("tcp://127.0.0.1:4030")
sock.subscribe("topicString")
for await (const [topic, msg] of sock) {
console.log(`${mom().tz('Asia/Kolkata').format('YYYY-MM-DDTHH:mm:ss.SSS')}`)
}
}
run()
  1. 我以node zsub.js > out启动我的订阅者
  2. 我以node zpub.js的身份推出了我的发行商。已成功接收所有消息
  3. zpub.js进程结束,但zsub.js仍在运行。当我重新运行node zpub.js时,订阅者没有收到任何消息out中的记录数保持不变
  4. 再次运行zpub.js一次或两次似乎可以向订阅者传递消息(最近的消息;而不是时间戳中看到的早期消息(

因此,我不确定在pub/sub端应该做些什么,这样消息就不会"丢失"。请告知。

Pub-Sub本质上是不可靠的,因为订阅者没有向发布者反馈消息已收到。本指南对此进行了详细描述,并提供了一些解决方案。其中一个解决方案是不使用Pub-Sub,而是使用Router Dealer。这取决于您的用例,这是否是一个可行的替代方案。

关于您的特定问题,订阅者最终确定与发布者的连接已丢失,并将尝试重新连接,直到重新建立连接。根据时间的不同,订阅者可能会错过发布者发送的初始(或所有(消息。

一般来说,如果出版商是通信的稳定部分(像服务器一样保持在线(,订阅者可以来来去去(像客户端一样(,Pub-Sub的工作效果最好。

最新更新