如何使用nodejs中的流块在S3上过滤存储为.json.gz的对象数组



假设s3中存储的json数组是个人详细信息,例如:[{"name":'A', "lastName":'A', age: 18}, {"name":'B', "lastName":'B', age: 20}, ...]这个文件可能非常大,我想优化内存使用并使用流来过滤数据,而不是将整个文件加载到内存中并过滤它。我不确定我完全理解"objectmode"是怎么回事。在这里工作。

我已经尝试了以下失败,因为控制台日志打印块为相同大小的字符串字节,而不是使用objectMode: true的对象组:

const filteredData = [];
const filterTransform = new Transform({
objectMode: true,
transform(chunk, _, callback) {
console.log("chunk : n"+chunk);
try {
const filteredData = chunk.map((item: any) => ({
name: item.name,
lastName: item.lastName,
}));
filteredData.push(JSON.stringify(filteredData));
} catch (err) {
callback(err);
}
callback();
},
});
const client = getS3Client();
const command = new GetObjectCommand({
Bucket: bucket,
Key: key,
});
const data:GetObjectCommandOutput= await client.send(command);
const readStream = (dataSingle.Body! as Readable)
.pipe(zlib.createGunzip())
.pipe(filterTransform)

示例输出为块1"{name:'A', lastName:'"块2"A'}, {name:'B', lastN"块3等等…

但我期望:块1[{"name":'A', "lastName":'A'}, {"name":'B', "lastName":'B'}]chunk 2[{"name":'C', "lastName":'C'}, {"name":'D', "lastName":'D'}]

…如何使块被计数为对象列表而不是字节?

要在S3上使用Node.js中的Streams块来过滤存储为.json.gz的对象数组,您需要确保您的流处于对象模式并且流发出的块是完整的对象。

在您的代码中,看起来您正在尝试使用transform流转换数据,但您没有正确处理块。当以对象模式处理流时,每个块应该代表一个完整的对象。如果对象被分割成多个块,在处理它之前需要缓冲这些块,直到有一个完整的对象。

下面是一个如何修改代码以正确过滤数据的示例:

const filterTransform = new Transform({
objectMode: true,
transform(chunk, _, callback) {
try {
const data = JSON.parse(chunk);
const filteredData = data.map((item) => ({
name: item.name,
lastName: item.lastName,
}));
callback(null, JSON.stringify(filteredData));
} catch (err) {
callback(err);
}
},
});
const client = getS3Client();
const command = new GetObjectCommand({
Bucket: bucket,
Key: key,
});
const data = await client.send(command);
const readStream = data.Body!.pipe(zlib.createGunzip()).pipe(filterTransform);
// consume the filtered data
readStream.on("data", (chunk) => {
console.log(chunk);
});
readStream.on("error", (err) => {
console.error(err);
});
readStream.on("end", () => {
console.log("Done");
});
例如,让我们尝试使用Transform流来解析和过滤数据。设置objectMode: true表示流应该处理完整的对象。在转换方法中,我们解析传入的JSON字符串并过滤数据。然后使用过滤后的数据作为JSON字符串调用回调函数。

当消费流时,我们必须监听数据事件,它将作为完整对象发出过滤后的数据。如果需要,我们还可以根据要求将过滤后的数据写入文件或另一个流

注意

在上面的例子中,我们假设流发出的每个块都是一个完整的JSON对象。如果块不是完整的对象,你需要缓冲块,直到你有一个完整的对象,然后再处理它。

最新更新