为什么Readable.push()每次Readable都返回false_调用read()



我在typescript中有以下可读流:

import {Readable} from "stream";
enum InputState {
    NOT_READABLE,
    READABLE,
    ENDED
}
export class Aggregator extends Readable {
    private inputs: Array<NodeJS.ReadableStream>;
    private states: Array<InputState>;
    private records: Array<any>;
    constructor(options, inputs: Array<NodeJS.ReadableStream>) {
        // force object mode
        options.objectMode = true;
        super(options);
        this.inputs = inputs;
        // set initial state
        this.states = this.inputs.map(() => InputState.NOT_READABLE);
        this.records = this.inputs.map(() => null);
        // register event handlers for input streams
        this.inputs.forEach((input, i) => {
            input.on("readable", () => {
                console.log("input", i, "readable event fired");
                this.states[i] = InputState.READABLE;
                if (this._readable) { this.emit("_readable"); }
            });
            input.on("end", () => {
                console.log("input", i, "end event fired");
                this.states[i] = InputState.ENDED;
                // if (this._end) { this.push(null); return; }
                if (this._readable) { this.emit("_readable"); }
            });
        });
    }
    get _readable () {
        return this.states.every(
            state => state === InputState.READABLE ||
            state === InputState.ENDED);
    }
    get _end () {
        return this.states.every(state => state === InputState.ENDED);
    }
    _aggregate () {
        console.log("calling _aggregate");
        let timestamp = Infinity,
            indexes = [];
        console.log("initial record state", JSON.stringify(this.records));
        this.records.forEach((record, i) => {
            // try to read missing records
            if (!this.records[i] && this.states[i] !== InputState.ENDED) {
                this.records[i] = this.inputs[i].read();
                if (!this.records[i]) {
                    this.states[i] = InputState.NOT_READABLE;
                    return;
                }
            }
            // update timestamp if a better one is found
            if (this.records[i] && timestamp > this.records[i].t) {
                timestamp = this.records[i].t;
                // clean the indexes array
                indexes.length = 0;
            }
            // include the record index if has the required timestamp
            if (this.records[i] && this.records[i].t === timestamp) {
                indexes.push(i);
            }
        });
        console.log("final record state", JSON.stringify(this.records), indexes, timestamp);
        // end prematurely if after trying to read inputs the aggregator is
        // not ready
        if (!this._readable) {
            console.log("end prematurely trying to read inputs", this.states);
            this.push(null);
            return;
        }
        // end prematurely if all inputs are ended and there is no remaining
        // record values
        if (this._end && indexes.length === 0) {
            console.log("end on empty indexes", this.states);
            this.push(null);
            return;
        }
        // create the aggregated record
        let record = {
            t: timestamp,
            v: this.records.map(
                (r, i) => indexes.indexOf(i) !== -1 ? r.v : null
            )
        };
        console.log("aggregated record", JSON.stringify(record));
        if (this.push(record)) {
            console.log("record pushed downstream");
            // remove records already aggregated and pushed
            indexes.forEach(i => { this.records[i] = null; });
            this.records.forEach((record, i) => {
                // try to read missing records
                if (!this.records[i] && this.states[i] !== InputState.ENDED) {
                    this.records[i] = this.inputs[i].read();
                    if (!this.records[i]) {
                        this.states[i] = InputState.NOT_READABLE;
                    }
                }
            });
        } else {
            console.log("record failed to push downstream");
        }
    }
    _read () {
        console.log("calling _read", this._readable);
        if (this._readable) { this._aggregate(); }
        else {
            this.once("_readable", this._aggregate.bind(this));
        }
    }
}

它被设计为在对象模式下聚合多个输入流。最后,它将多个时间序列数据流聚合为一个数据流。我面临的问题是,当我测试该功能时,我会重复看到消息record failed to push downstream和消息calling _read true,并且仅在与聚合算法相关的3条消息之间。因此Readable流机制调用_read,每次调用push((都失败。知道为什么会发生这种事吗?你知道有什么库可以实现这种算法,或者有更好的方法来实现这种功能吗?

我会自己回答这个问题。

问题是我误解了this.push()返回值调用的含义。我认为错误的返回值意味着当前的推送操作失败,但真正的意义是下一次推送操作将失败。

上面显示的代码的一个简单修复是替换这个:

if (this.push(record)) {
    console.log("record pushed downstream");
    // remove records already aggregated and pushed
    indexes.forEach(i => { this.records[i] = null; });
    this.records.forEach((record, i) => {
        // try to read missing records
        if (!this.records[i] && this.states[i] !== InputState.ENDED) {
            this.records[i] = this.inputs[i].read();
            if (!this.records[i]) {
                this.states[i] = InputState.NOT_READABLE;
            }
        }
    });
} else {
    console.log("record failed to push downstream");
}

通过这个:

this.push(record);
console.log("record pushed downstream");
// remove records already aggregated and pushed
indexes.forEach(i => { this.records[i] = null; });
this.records.forEach((record, i) => {
    // try to read missing records
    if (!this.records[i] && this.states[i] !== InputState.ENDED) {
        this.records[i] = this.inputs[i].read();
        if (!this.records[i]) {
            this.states[i] = InputState.NOT_READABLE;
        }
    }
});

您可以注意到,唯一的区别是避免对this.push()调用的返回值进行条件操作。假设当前实现每个_read()调用this.push()仅一次,那么这个简单的更改就解决了问题。

这意味着进食比消耗快。官方方法是放大其高WaterMark,默认值:16384(16KB(,或16表示objectMode。只要其内部缓冲区足够大,push函数就会始终返回true。它不必是single_read((中的single-push((。您可以在单个_read((中按highWaterMark所指示的数量进行推送。

相关内容

最新更新