读取CSV文件并使用websocket(Node,Socket.io)每隔一段时间发送数据



我对Node和Express.js相对陌生。我正在尝试创建一个websocket服务器,以不规则的间隔逐行推送存储在文件中的CSV数据。CSV结构如下:[timeout[ms],data1,data2,data3…]

我已经成功地创建了一个与客户端通信的websocket服务器。

我正在寻找一个最好的解决方案来有效地完成以下任务:1.读取CSV文件的一行2.用WebSockets发送一行3.暂停读取存储在行的第一个值中的一段时间4.在间隔结束后继续读取,然后返回步骤1。

到目前为止,我已经走到了这一步(请随意丢弃我的代码,因为它可能是非常错误的——正如我所说,我是新手。看起来pause()什么都没做。

var $    = require('jquery')
,csv = require('csv');
exports.index = function(server){
  var io   = require('socket.io').listen(server);
  io.sockets.on('connection', function (socket) {
  socket.on('startTransmission', function(msg) {
    csv()
    .from.path('C:/dev/node_express/csv/test.csv', { delimiter: ',', escape: '"' })
    .on('record', function(row,index){
      var rowArray = $.parseJSON(JSON.stringify(row));
      var json = {},
          that = this;
        $.each(rowArray, function(i,value){
          json[keys[i]] = value;
        });
        socket.emit('transmitDataData', json);
        //this.pause(); //I guess around here is where I'd like to pause 
        // setTimeout(function(){
        //   that.resume();  //and resume here after the timeout, stored in the first value (rowArray[0])    
        // }, rowArray[0]);
    });
});
});
};

不幸的是,注释掉的代码不起作用-所有数据都会立即逐行发送,函数不会暂停

我在另一个用例中遇到了同样的事情。问题是,对流调用pause()会暂停底层流读取,但不会暂停csv记录解析,因此record事件可以与构成最后读取流块的其余记录一起调用。在我的情况下,我同步了它们,如下所示:

var rows=0, actions=0;
stream.on('record', function(row, index){                                                                 
    rows++;                                
    // pause here, but expect more record events until the raw read stream is exhausted
    stream.pause();
    runner.do(row, function(err, result) {                                                 
        // when actions have caught up to rows read, read more rows.
        if (actions==rows) {
            stream.resume();
        }                    
    });
});

在你的情况下,我会缓冲这些行,然后用计时器释放它们。这里有一个未经测试的重新保理,只是为了让你了解我的意思:

var $ = require('jquery'),
    csv = require('csv');
exports.index = function(server){
  var io = require('socket.io').listen(server);
  io.sockets.on('connection', function (socket) {
      socket.on('startTransmission', function(msg) {
        var timer=null, buffered=[], stream=csv().from.path('C:/dev/node_express/csv/test.csv', { delimiter: ',', escape: '"' });
        function transmit(row) {        
            socket.emit('transmitDataData', row);                                     
        }       
        function drain(timeout) {                                                    
            if (!timer) {
                timer = setTimeout(function() {                                    
                    timer = null;
                    if (buffered.length<=1) { // get more rows ahead of time so we don't run out. otherwise, we could skip a beat.
                        stream.resume(); // get more rows
                    } else {                        
                        var row = buffered.shift();
                        transmit(row);
                        drain(row[0]);                        
                    }
                }, timeout);               
            }                
        }
        stream.on('record', function(row,index){                        
            stream.pause();                                                                                   
            if (index == 0) {                            
                transmit(row);                                               
            } else {                            
                buffered.push(row);                                   
            }                                                       
            drain(row[0]); // assuming row[0] contains a timeout value.                                                                  
        });
        stream.on('end', function() {
            // no more rows. wait for buffer to empty, then cleanup.
        });
        stream.on('error', function() {
            // handle error.
        });
    });
};

相关内容

  • 没有找到相关文章