对NodeJs中的流实现感到困惑

  • 本文关键字:实现 NodeJs node.js stream
  • 更新时间 :
  • 英文 :


我正在尝试实现通过套接字发送和接收文件的协议。协议是指定的,我不能更改。

我是NodeJs的新手,这就是我试图实现的方式。

我将编写一个双工流,并将文件放入其中。然后通过管道将其传输到套接字以发送数据。

困惑在于我应该在哪里读这个,在哪里写那个。如何知道读取文件已经完成,以及如何告诉套接字它已经完成。文档对我来说不是很清楚,谷歌搜索增加了更多的困惑:)

如有任何帮助,不胜感激。

注:我回家后会添加我自己的样品,我现在没有。

编辑

在@MattHarrison的回答之后,我将代码更改为:

var stream = require('stream');
var util = require('util');
var bufferpack = require('bufferpack');
var fs = require('fs');
var net = require('net');
var MyProtocolStream = function () {
    this.writtenHeader = false;            // have we written the header yet?
    stream.Transform.call(this);
};
util.inherits(MyProtocolStream, stream.Transform);
MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {
    if (!this.writtenHeader) {
        this.push('==== HEADER ====n');  // if we've not, send the header first
    }
    // Can this function be interrupted at this very line?
    // Then another _transform comes in and pushes its own data to socket
    // Corrupted data maybe then?
    // How to prevent this behavior? Buffering whole file before sending?
    var self = this;
    // I put a random timeout to simulate overlapped calls
    // Can this happen in real world?
    setTimeout(function () {
        self.push(chunk);  // send the incoming file chunks along as-is
        callback();
    }, Math.random()*10);
};
MyProtocolStream.prototype._flush = function (callback) {
    this.push('==== FOOTER ====n');      // Just before the stream closes, send footer
    callback();
};
var file = '/tmp/a';
var server = net.createServer(function (sck) {
    sck.addr = sck.remoteAddress;
    console.log('Client connected - ' + sck.addr);
    fs.createReadStream('/tmp/a').pipe(new MyProtocolStream()).pipe(sck);
    fs.createReadStream('/tmp/b').pipe(new MyProtocolStream()).pipe(sck);
    fs.createReadStream('/tmp/c').pipe(new MyProtocolStream()).pipe(sck);
    sck.on('close', function () {
        console.log('Client disconnected - ' + this.addr);
    })
});
server.listen(22333, function () {
    console.log('Server started on ' + 22333)
});

见我在_transform的评论

我不确定您正在尝试实现的协议的确切细节,但以下应该会给您一个可以适应您需求的良好模式。

我的虚构协议

当客户端套接字连接到我的TCP服务器时,我想向他们发送一个文件。但首先我想发送一个header。在文件的末尾,在流结束之前,我还想发送一个报头。因此,写入套接字的数据看起来像:

==== HEADER ====
[FILE CONTENTS]
==== FOOTER ====

实现转换流

我要做的就是转换从可读流出来的数据。注意transform是这里的关键字。我可以使用Transform流。

创建Transform流时,您可以覆盖两个方法:_transform_flush_transform在可读流的每个块到来时被调用。你可以改变数据,缓冲数据等等。_flush在所有可读数据完成后被调用。您可以在这里进行更多的清理,或者将最后一位数据写出来。

var Stream = require('stream');
var Util = require('util');
var MyProtocolStream = function () {
    this.writtenHeader = false;            // have we written the header yet?
    Stream.Transform.call(this);
};
Util.inherits(MyProtocolStream, Stream.Transform);
MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {
    if (!this.writtenHeader) {
        this.push('==== HEADER ====n');  // if we've not, send the header first
        this.writtenHeader = true;
    }
    this.push(chunk);                     // send the incoming file chunks along as-is
    callback();
};
MyProtocolStream.prototype._flush = function (callback) {
    this.push('==== FOOTER ====n');      // Just before the stream closes, send footer 
    callback();
};

使用MyProtocolStream

所以现在我有了一个做我想做的事情的流,我可以简单地通过我的自定义Transform流将文件(或任何可读流)管道到任何其他可写流(如套接字)。

var Fs = require('fs');
var Net = require('net');
var server = Net.createServer(function (socket) {
    Fs.createReadStream('./example.txt')
        .pipe(new MyProtocolStream())
        .pipe(socket);
});
server.listen(8000);

测试

我可以通过添加一些内容到example.txt: 来测试这一点。
This is a line
This is another line
This is the last line

我可以启动我的服务器,然后通过telnet/nc连接:

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
==== HEADER ====
This is a line
This is another line
This is the last line
==== FOOTER ====
Connection closed by foreign host.

那么双工流呢?

双工流是嵌入在一个流中的两个流。数据从其中一个输出,你把完全不同的数据写入另一个。它用于与另一个实体(如TCP套接字)进行双向通信的地方。在这个例子中,我们不需要双工流,因为数据只在一个方向上流动:

file -> MyProtocolStream -> socket

学习更多

正如Meir在另一个答案中指出的那样,Substack的流手册与官方文档一起是流的规范(也是最好的)资源。如果你通读了它们并自己实现了这些例子,你就会学到所有你需要知道的关于流的知识。

通过一个套接字发送多个文件

如果您想要将这些转换流的多个输出写入单个可写端,pipe()将不适合您。一旦EOF来自单个流,上游可写(套接字)也将被关闭。在这种情况下,也不能保证数据事件的顺序。因此,您需要通过监听data/end事件来手动聚合流,开始读取一个又一个完成的流:

var server = Net.createServer(function (socket) {
    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());
    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());
    var readStream = function (stream, callback) {
        stream.on('data', socket.write.bind(socket));
        stream.on('end', callback);
    };
    readStream(incoming1, function () {
        readStream(incoming2, function () {
            socket.end();
        });
    });
});

如果你不喜欢嵌套回调,你也可以使用promises:

var server = Net.createServer(function (socket) {
    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());
    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());
    var incoming3 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());
    var readStream = function (stream) {
        return new Promise(function (resolve, reject) {
            stream.on('data', socket.write.bind(socket));
            stream.on('end', resolve);
        });
    };
    readStream(incoming1)
    .then(function () {
        return readStream(incoming2);
    })
    .then(function () {
        return readStream(incoming3);
    })
    .then(function () {
        socket.end();
    });
});

这是一个关于流的很好的资源。

对于套接字的使用,当你发送/获取套接字消息时,你发送类型和有效负载。因此,您的通用套接字流可以发送类型为'file_data'的消息与内容,而您有数据要发送,并在结束时发送类型为'eof'的消息(用于文件结束)与一个空的有效载荷。

相关内容

  • 没有找到相关文章

最新更新