我需要帮助来修复以下方法中的错误,这是一种将文件从可读流写入存储(例如:HDD,云)的方法。
可以在写入之前对文件应用一些转换(在self.transformWrite()
中使用管道)。
如果定义了副本(以 copyTo
为单位),则在每个存储区上调用相同的 write
方法,因此对write
的一次调用可能会导致级联write
。
面临的问题可能是流,也许是异步,我真的不知道,我有一个可读流,可能还有一个或多个可写流(每个write
调用至少一个)。
有时,不会调用 from.on('end'
事件,因此即使文件已正确且完全写入磁盘,也永远不会执行预期的代码......或者有时,文件是用 0 字节创建的。
这是完整的代码:https://github.com/jalik/jalik-ufs/blob/master/ufs-store.js
/**
* Writes the file to the store
* @param rs
* @param fileId
* @param callback
*/
self.write = function (rs, fileId, callback) {
var file = self.getCollection().findOne(fileId);
var ws = self.getWriteStream(fileId, file);
var errorHandler = function (err) {
self.getCollection().remove(fileId);
self.onWriteError.call(self, err, fileId, file);
callback.call(self, err);
};
rs.on('error', Meteor.bindEnvironment(errorHandler));
ws.on('error', Meteor.bindEnvironment(errorHandler));
ws.on('finish', Meteor.bindEnvironment(function () {
var size = 0;
var from = self.getReadStream(fileId, file);
from.on('data', function (data) {
size += data.length;
});
from.on('end', Meteor.bindEnvironment(function () {
// Set file attribute
file.complete = true;
file.progress = 1;
file.size = size;
file.token = UploadFS.generateToken();
file.uploading = false;
file.uploadedAt = new Date();
file.url = self.getFileURL(fileId);
// Sets the file URL when file transfer is complete,
// this way, the image will loads entirely.
self.getCollection().update(fileId, {
$set: {
complete: file.complete,
progress: file.progress,
size: file.size,
token: file.token,
uploading: file.uploading,
uploadedAt: file.uploadedAt,
url: file.url
}
});
// todo move copy code here
// Return file info
callback.call(self, null, file);
// Execute callback
if (typeof self.onFinishUpload == 'function') {
self.onFinishUpload.call(self, file);
}
}));
}));
// Simulate write speed
if (UploadFS.config.simulateWriteDelay) {
Meteor._sleepForMs(UploadFS.config.simulateWriteDelay);
}
// todo execute copy after original file saved
// Copy file to other stores
if (options.copyTo instanceof Array) {
for (var i = 0; i < options.copyTo.length; i += 1) {
var copyStore = options.copyTo[i];
var copyId = null;
var copy = _.omit(file, '_id', 'url');
copy.originalStore = self.getName();
copy.originalId = fileId;
try {
// Create the copy
copyId = copyStore.create(copy);
(function (copyStore, copyId, copy) {
// Write the copy
copyStore.write(rs, copyId, Meteor.bindEnvironment(function (err) {
if (err) {
copyStore.getCollection().remove(copyId);
self.onCopyError.call(self, err, copyId, copy);
}
}));
})(copyStore, copyId, copy);
} catch (err) {
copyStore.getCollection().remove(copyId);
self.onCopyError.call(self, err, copyId, copy);
}
}
}
// Execute transformation
self.transformWrite(rs, ws, fileId, file);
};
}
好的,为了简单起见,我不得不为每个副本重新打开相同的流,而不是使用相同的可读流,所以这里是新代码:
self.write = function (rs, fileId, callback) {
var file = self.getCollection().findOne(fileId);
var ws = self.getWriteStream(fileId, file);
var errorHandler = Meteor.bindEnvironment(function (err) {
self.getCollection().remove(fileId);
self.onWriteError.call(self, err, fileId, file);
callback.call(self, err);
});
ws.on('error', errorHandler);
ws.on('finish', Meteor.bindEnvironment(function () {
var size = 0;
var from = self.getReadStream(fileId, file);
from.on('data', function (data) {
size += data.length;
});
from.on('end', Meteor.bindEnvironment(function () {
// Set file attribute
file.complete = true;
file.progress = 1;
file.size = size;
file.token = UploadFS.generateToken();
file.uploading = false;
file.uploadedAt = new Date();
file.url = self.getFileURL(fileId);
// Sets the file URL when file transfer is complete,
// this way, the image will loads entirely.
self.getCollection().update(fileId, {
$set: {
complete: file.complete,
progress: file.progress,
size: file.size,
token: file.token,
uploading: file.uploading,
uploadedAt: file.uploadedAt,
url: file.url
}
});
// Return file info
callback.call(self, null, file);
// Execute callback
if (typeof self.onFinishUpload == 'function') {
self.onFinishUpload.call(self, file);
}
// Simulate write speed
if (UploadFS.config.simulateWriteDelay) {
Meteor._sleepForMs(UploadFS.config.simulateWriteDelay);
}
// Copy file to other stores
if (options.copyTo instanceof Array) {
for (var i = 0; i < options.copyTo.length; i += 1) {
var copyStore = options.copyTo[i];
var copyId = null;
var copy = _.omit(file, '_id', 'url');
copy.originalStore = self.getName();
copy.originalId = fileId;
try {
// Create the copy in the collection
copyId = copyStore.create(copy);
(function (copyStore, copyId, copy) {
// Write the copy to the store
var cs = self.getReadStream(fileId, file);
copyStore.write(cs, copyId, Meteor.bindEnvironment(function (err) {
if (err) {
copyStore.getCollection().remove(copyId);
self.onCopyError.call(self, err, copyId, copy);
}
}));
})(copyStore, copyId, copy);
} catch (err) {
copyStore.getCollection().remove(copyId);
self.onCopyError.call(self, err, copyId, copy);
}
}
}
}));
}));
// Execute transformation
self.transformWrite(rs, ws, fileId, file);
};