多个流未正确写入



我需要帮助来修复以下方法中的错误,这是一种将文件从可读流写入存储(例如:HDD,云)的方法。

可以在写入之前对文件应用一些转换(在self.transformWrite()中使用管道)。

如果定义了副本(以 copyTo 为单位),则在每个存储区上调用相同的 write 方法,因此对write的一次调用可能会导致级联write

面临的问题可能是流,也许是异步,我真的不知道,我有一个可读流,可能还有一个或多个可写流(每个write调用至少一个)。

有时,不会调用 from.on('end' 事件,因此即使文件已正确且完全写入磁盘,也永远不会执行预期的代码......或者有时,文件是用 0 字节创建的。

这是完整的代码:https://github.com/jalik/jalik-ufs/blob/master/ufs-store.js

    /**
     * Writes the file to the store
     * @param rs
     * @param fileId
     * @param callback
     */
    self.write = function (rs, fileId, callback) {
        var file = self.getCollection().findOne(fileId);
        var ws = self.getWriteStream(fileId, file);
        var errorHandler = function (err) {
            self.getCollection().remove(fileId);
            self.onWriteError.call(self, err, fileId, file);
            callback.call(self, err);
        };
        rs.on('error', Meteor.bindEnvironment(errorHandler));
        ws.on('error', Meteor.bindEnvironment(errorHandler));
        ws.on('finish', Meteor.bindEnvironment(function () {
            var size = 0;
            var from = self.getReadStream(fileId, file);
            from.on('data', function (data) {
                size += data.length;
            });
            from.on('end', Meteor.bindEnvironment(function () {
                // Set file attribute
                file.complete = true;
                file.progress = 1;
                file.size = size;
                file.token = UploadFS.generateToken();
                file.uploading = false;
                file.uploadedAt = new Date();
                file.url = self.getFileURL(fileId);
                // Sets the file URL when file transfer is complete,
                // this way, the image will loads entirely.
                self.getCollection().update(fileId, {
                    $set: {
                        complete: file.complete,
                        progress: file.progress,
                        size: file.size,
                        token: file.token,
                        uploading: file.uploading,
                        uploadedAt: file.uploadedAt,
                        url: file.url
                    }
                });
                // todo move copy code here
                // Return file info
                callback.call(self, null, file);
                // Execute callback
                if (typeof self.onFinishUpload == 'function') {
                    self.onFinishUpload.call(self, file);
                }
            }));
        }));
        // Simulate write speed
        if (UploadFS.config.simulateWriteDelay) {
            Meteor._sleepForMs(UploadFS.config.simulateWriteDelay);
        }
        // todo execute copy after original file saved
        // Copy file to other stores
        if (options.copyTo instanceof Array) {
            for (var i = 0; i < options.copyTo.length; i += 1) {
                var copyStore = options.copyTo[i];
                var copyId = null;
                var copy = _.omit(file, '_id', 'url');
                copy.originalStore = self.getName();
                copy.originalId = fileId;
                try {
                    // Create the copy
                    copyId = copyStore.create(copy);
                    (function (copyStore, copyId, copy) {
                        // Write the copy
                        copyStore.write(rs, copyId, Meteor.bindEnvironment(function (err) {
                            if (err) {
                                copyStore.getCollection().remove(copyId);
                                self.onCopyError.call(self, err, copyId, copy);
                            }
                        }));
                    })(copyStore, copyId, copy);
                } catch (err) {
                    copyStore.getCollection().remove(copyId);
                    self.onCopyError.call(self, err, copyId, copy);
                }
            }
        }
        // Execute transformation
        self.transformWrite(rs, ws, fileId, file);
    };
}

好的,为了简单起见,我不得不为每个副本重新打开相同的流,而不是使用相同的可读流,所以这里是新代码:

    self.write = function (rs, fileId, callback) {
        var file = self.getCollection().findOne(fileId);
        var ws = self.getWriteStream(fileId, file);
        var errorHandler = Meteor.bindEnvironment(function (err) {
            self.getCollection().remove(fileId);
            self.onWriteError.call(self, err, fileId, file);
            callback.call(self, err);
        });
        ws.on('error', errorHandler);
        ws.on('finish', Meteor.bindEnvironment(function () {
            var size = 0;
            var from = self.getReadStream(fileId, file);
            from.on('data', function (data) {
                size += data.length;
            });
            from.on('end', Meteor.bindEnvironment(function () {
                // Set file attribute
                file.complete = true;
                file.progress = 1;
                file.size = size;
                file.token = UploadFS.generateToken();
                file.uploading = false;
                file.uploadedAt = new Date();
                file.url = self.getFileURL(fileId);
                // Sets the file URL when file transfer is complete,
                // this way, the image will loads entirely.
                self.getCollection().update(fileId, {
                    $set: {
                        complete: file.complete,
                        progress: file.progress,
                        size: file.size,
                        token: file.token,
                        uploading: file.uploading,
                        uploadedAt: file.uploadedAt,
                        url: file.url
                    }
                });
                // Return file info
                callback.call(self, null, file);
                // Execute callback
                if (typeof self.onFinishUpload == 'function') {
                    self.onFinishUpload.call(self, file);
                }
                // Simulate write speed
                if (UploadFS.config.simulateWriteDelay) {
                    Meteor._sleepForMs(UploadFS.config.simulateWriteDelay);
                }
                // Copy file to other stores
                if (options.copyTo instanceof Array) {
                    for (var i = 0; i < options.copyTo.length; i += 1) {
                        var copyStore = options.copyTo[i];
                        var copyId = null;
                        var copy = _.omit(file, '_id', 'url');
                        copy.originalStore = self.getName();
                        copy.originalId = fileId;
                        try {
                            // Create the copy in the collection
                            copyId = copyStore.create(copy);
                            (function (copyStore, copyId, copy) {
                                // Write the copy to the store
                                var cs = self.getReadStream(fileId, file);
                                copyStore.write(cs, copyId, Meteor.bindEnvironment(function (err) {
                                    if (err) {
                                        copyStore.getCollection().remove(copyId);
                                        self.onCopyError.call(self, err, copyId, copy);
                                    }
                                }));
                            })(copyStore, copyId, copy);
                        } catch (err) {
                            copyStore.getCollection().remove(copyId);
                            self.onCopyError.call(self, err, copyId, copy);
                        }
                    }
                }
            }));
        }));
        // Execute transformation
        self.transformWrite(rs, ws, fileId, file);
    };

相关内容

  • 没有找到相关文章

最新更新