为什么流数据在 Node js 中不能连续工作?



一直试图在连续阅读数据时几天,但尚未找到解决方案。实际上,我对此非常糟糕。我正在使用套接字IO进行实时数据更新。对于正常传输的节点JS,我的数据非常大,因此我必须将数据流式传输。问题在于,插座首次加载后停止更新新的JSON数据。数据更新在第一次加载中正常。除非我重新启动节点,否则块数据停止传输。

这是我的完整服务器文件:

var EventEmitter = require('events');
const emitter = new EventEmitter()
emitter.setMaxListeners(0)
var express = require('express'); 
var app = express();
var fs = require('fs');
var readableStream = fs.createReadStream('./edata.json')
var options = {
    key: fs.readFileSync('/etc/letsencrypt/live/xsite.com/privkey.pem'),
    cert: fs.readFileSync('/etc/letsencrypt/live/xsite.com/fullchain.pem'),
    ca: fs.readFileSync('/etc/letsencrypt/live/xsite.com/chain.pem')
};
var https = require('https').Server(options, app);
var zmq = require('zeromq')
  , sock = zmq.socket('pull');
  sock.bind('tcp://10.150.0.6:1111');
var io = require('socket.io')(https); 
io.on('connection', function(socket){ 
     socket.on('disconnect',function(){
        console.log("client disconnected");
    })  
    sock.on('message',function(chunk){  
    readableStream.on('data',chunk=>{
        console.log("working emitter", chunk.toString())
     socket.emit('latest_score',chunk.toString());      
    })
   });  
});
  https.listen(1969);
  sock.on('connect', function(fd, ep) {console.log('connect, endpoint:', ep);});
console.log('Server connected to port 1969');

如果这里有人可以建议我如何在不重新启动节点的情况下连续传输数据。善良的建议将不胜感激。

您的设计如何处理事件可能是错误的。我将逐步解释您的设计的作用。

  1. 您定义并初始化readableStream以流式传输文件。由于没有data事件处理程序,它尚未开始流动。因此,流最初正在等待。
  2. 您将sock定义为Zeromq套接字。
  3. 您定义了socket.io服务器。
  4. 您定义了io.on('connection', ...)侦听器,因此如果输入socket.io Connections。
  5. io.on('connection', ...)的内部,您可以执行sock.on('message', function(chunk){{...});,它使您成为ZEROMQ套接字上传入的message事件的听众。
  6. 在该Zeromq侦听器内部,您设置了readableStream.on('data',chunk=>{ socket.emit(...)});。因此,这是readableStream上的第一个data事件侦听器,因此它将启动流动流,您将获得一系列data事件,您将在socket.io socket上发送它们。
  7. 假设没有其他Zeromq事件,也没有其他套接字。IO插座已经到达,那么上述步骤中事件中的readableStream数据将全部贯穿,您将全部发送到socket.io套接字上。
  8. 现在,想象一个socket.io连接在已经发送了readableStream的所有内容之后(流到达末端并完成(。此时,您将再次等待Zermomq消息。发生这种情况时,您将在readableStream上注册另一个data事件处理程序。但是,ReadableStream已经完成并关闭。它没有更多的 data事件,因此没有发送第二个socket.io连接。
  9. 如果您从第一个开始,在流过程中间连接了多个socket.io连接,
  10. 甚至更糟的事情可能会发生。您的第二个socket.io连接可能仅获得数据流的一部分。
  11. ,而且,我不知道您的Zeromq message听众应该在这里做什么,因为没有什么可以开始流到第一个socket.io连接,直到该消息发生为止。

因此...如何解决此问题取决于您真正希望每个新套接字连接的行为是什么。您将不得不充分解释这一点,并解释这与Zeromq消息有什么关系,以建议我们提出固定的设计。但是,希望对您现有代码的这种解释将解释您为什么看到自己的行为。

如果要将数据从readableStream发送到每个新的socket.io连接,则必须为每个socket.io连接创建一个新的和独立的readableSream。我仍然不知道这与Zeromq有什么关系,以及为什么您在收到Zeromq消息或在数据流中的另一个Zeromq消息时不会启动任何内容。

最新更新