我正在尝试解析csv文件。对于每一行,我需要使用got.
调用服务。我想按顺序处理每一行,因为可能有很多行,如果所有的东西并行运行,我将超载服务。
这是我当前代码的简化版本:
package.json
{
"type": "module",
"dependencies": {
"csv": "^6.0.5",
"fs": "^0.0.1-security",
"got": "^12.1.0"
}
}
服务/service.js
import got from 'got';
export class Service{
/**
* @description HTTP GET /api/v2/findings
* */
static async GetService1 () {
let response = await got("https://google.com");
return response;
}
static async GetService2 () {
let response = await got("https://google.com");
return response;
}
}
index.js
// Import the package
import * as csv from 'csv';
import * as fs from 'fs';
import {Service} from './services/service.js';
console.log("start");
let inStream;
inStream = fs.createReadStream(
"test.csv");
inStream
.pipe(csv.parse({
delimiter: ';'
}))
.pipe(
csv.transform(
{ parallel: 1 },
(record) => {
let col1 = record[0];
(async () => {
let response1, response2;
response1 = await Service.GetService1()
console.log("line %d, after call 1", col1)
response2 = await Service.GetService2()
console.log("line %d, after call 2", col1)
})();
console.log("line %d, after async", col1)
}))
console.log("end")
test.csv
1;"muti-line
comment 1"
2;"muti-line
comment 2"
3;"muti-line
comment 3"
代码显示:
start
end
line 1, after async
line 2, after async
line 3, after async
line 2, after call 1
line 1, after call 1
line 3, after call 1
line 3, after call 2
line 1, after call 2
line 2, after call 2
我们可以看到有几个问题:
- 在transform匿名函数中,主线程在异步块之后继续,即使所有的处理都没有完成(消息"after async")出现在call 1"之后")
- 线路被并行处理,即使变压器的并行选项为1:我们可以看到,因为调用1之后的所有"> 所有的"after call 2">
我想要得到的是:
start
line 1, after call 1
line 1, after call 2
line 1, after async
line 2, after call 1
line 2, after call 2
line 2, after async
line 3, after call 1
line 3, after call 2
line 3, after async
end
问题1)是一个关于节点async/await的一般性问题。问题2可能与CSV库或流管道如何工作有关。
有什么建议让一切工作顺序?理想情况下,我希望能够配置给定级别的并行性,但我可以接受完全顺序执行。
我对其他库/实现开放(对于csv解析要注意多行字段)。最好使用async/await而不是promises,因为它似乎是更最新的语法,但如果它只能用promises来完成,为什么不呢…
谢谢!
如果你使用ReadableStream作为AsyncIterator,你可以使用for await ... of
循环。这将允许你await
你的异步操作:
let inStream;
inStream = fs.createReadStream(
"test.csv");
const readable = inStream
.pipe(csv.parse({
delimiter: ';'
}));
let i = 0;
for await (const record of readable) {
let col1 = record[0];
let response1, response2;
response1 = await Service.GetService1()
console.log("line %d, after call 1", col1)
response2 = await Service.GetService2()
console.log("line %d, after call 2", col1)
}
console.log("end");
线路被并行处理,即使变压器的并行选项为1
在您的原始代码中,您获得1行CSV数据,您启动异步操作,获得"line %d, after async"
日志,然后获得另一行。CSV库一次只解析一行。只是你告诉它"执行一个异步操作,然后等待它完成",而不是"执行一个异步操作,然后继续执行,而不用等待"。
最好使用async/await而不是promises
Async/await 是使用承诺,只是语法不同。
for await ... of
是一种接受AsyncIterator的循环类型,这是一个对象(例如,ReadableStream的实例),可以返回多个promise并且(技术上可选)说它们'完成'并停止循环。
通过反复获取Promise,将其解析为一个值(一行CSV数据)并在两者之间使用await
,您可以逐行处理流,这种方式比使用pipe()
或on('...')
基于事件的函数更短。当流耗尽时,告诉for await ... of
循环迭代器'完成',并执行下一行console.log("end");
。