是否可以从fs.readStream拦截和自定义进程流块?



是否可以从fs.readStream处理中游的数据块? 示例:读取文件并计算字符出现的次数,而无需将整个文件存储在内存中。 只需解析流中的每个块并聚合一个数字。我尝试创建一个函数以传递给 .pipe(),但需要定义 fn.on()并且失败了。

我对流媒体有点初学者。 我做了一堆谷歌搜索,阅读,实验,但没有任何帮助。我也找不到有关管道工作原理的任何信息,或者您是否可以制作自己的自定义管道接收器功能。

谢谢

您可以做的最简单的事情就是在"data"事件中订阅并简单地计算发生次数,如下所示:

'use strict';
const fs = require('fs');
const countCharsInString = function (st, char) {
	//Use a regex to calculate this
	let regex = new RegExp(char, 'g');
	return (st.match(regex) || []).length
};
const CHAR_TO_COUNT = '1';
let total_count = 0;
let fileStream = fs.createReadStream('./log.txt');
//We'll calculate the occurrences as the stream emits data event
//As we don't keep any references to 'chunk' and we don't collect the data anywhere
//The garbage collector will clean the memory and we'll not run out of the RAM.
fileStream.on('data', chunk => {
	let string = chunk.toString();
	total_count += countCharsInString(string, CHAR_TO_COUNT);
});
fileStream.on('error', err => console.error(err.stack));
//Log the count when everything is done.
fileStream.on('end', () => console.log(`All done. There are ${total_count} occurrences of character '${CHAR_TO_COUNT}'.`));

但是,如果您想通过流执行此操作,动态聚合数据,然后将其传输到其他地方,您可以执行以下操作:

'use strict';
const fs = require('fs');
const Transform = require('stream').Transform;
//We inherit from the Transform stream class
class OcurrenceCountingStream extends Transform {
	constructor(options) {
		super(options);
		//Allowing to pass an option here
		this.charToCount = options.charToCount;
		this.totalCount = 0;
	}
	//This is now a static method as it's a pure function
	//That does not depend on the object state
	static countCharsInString(st, char) {
		//Use a regex to calculate this
		let regex = new RegExp(char, 'g');
		return (st.match(regex) || []).length
	}
	//We should implement _transform function
	//in order to make all piping mechanisms working
	_transform(chunk, encoding, callback) {
		//Get our string, process and update totalCount
		let string = chunk.toString();
		this.totalCount += OcurrenceCountingStream.countCharsInString(string, this.charToCount);
		//Pass the data further
		callback(null, chunk);
	}
}
let fileStream = fs.createReadStream('./log.txt');
let countingStream = new OcurrenceCountingStream({charToCount: '1'});
//So now we can pipe 
fileStream.pipe(countingStream);
/*
Here is a little moment.
The stream should be in a flowing mode. This means that is started reading the data
From the writable that was piped to it and will keep reading until the writer is ended
So nothing basically happens if we just pipe it like this fileStream.pipe(countingStream);
There are some ways to make countingStream flowing:
1) Subscribe on its 'data' event.
2) Pipe it somewhere
3) Call .resume() is we don't really care of the data that's comming out.
*/
countingStream.resume();
countingStream.on('error', err => console.error(err.stack));
countingStream.on('end', () => console.log(`All done. There are ${countingStream.totalCount} occurrences of character '${countingStream.charToCount}'.`));

最新更新