在nodejs中并行流巨大的行分隔json文件



我正在使用createReadstream读取一个350M行文件,并将每行转换为行分隔文件。下面是我用来做这件事的代码。

var fs = require("fs");
var args = process.argv.slice(2);
var split = require("split")
fs.createReadStream(args[0])
.pipe(split(JSON.parse))
.on('data', function(obj) {
<data trasformation operation>
})
.on('error', function(err) {
})

红色350M线需要40分钟,它只使用一个CPU核心,而这样做。我有16个CPU内核。我怎样才能使这个行读取过程并行运行,以便至少使用10个内核,并在更短的时间内完成整个操作。

我试着使用这个模块- https://www.npmjs.com/package/parallel-transform。但是当我检查htop时,它仍然是单个CPU在执行操作。

var stream = transform(10, {
objectMode: true
}, function(data, callback) {
<data trasformation operation>
callback(null, data);
});
fs.createReadStream(args[0])
.pipe(stream)
.pipe(process.stdout);

流媒体时并行读取文件的更好方法是什么?

您可以尝试scramjet-我很高兴找到一个具有强大的多线程用例的人来设置适当的测试。

你的代码看起来像这样:

var fs = require("fs");
var {StringStream} = require("scramjet");
var args = process.argv.slice(2);
let i = 0;
let threads = os.cpus().length; // you may want to check this out
StringStream.from(fs.createReadStream(args[0]))
.lines() // it's better to deserialize this in the threads
.separate(() => i = ++i % threads)
.cluster(stream => stream // these will happen in the thread
.JSONParse()
.map(yourProcessingFunc) // this can be async as well
)
.mux() // if the function above returns something you'll get
// a stream of results
.run() // this executes the whole workflow.
.catch(errorHandler)

您可以使用更好的affinity函数来分隔,请参阅此处的文档,其中可以根据数据将数据定向到特定的worker。如果你遇到任何问题,请创建一个repo,让我们看看如何解决这些问题。

最新更新