Node js / javascript:在解析csv和使用get调用webservices时,如何使用pipe和asy



我正在尝试解析csv文件。对于每一行,我需要使用got.

调用服务。我想按顺序处理每一行,因为可能有很多行,如果所有的东西并行运行,我将超载服务。

这是我当前代码的简化版本:

package.json

{
  "type": "module",
  "dependencies": {
    "csv": "^6.0.5",
    "fs": "^0.0.1-security",
    "got": "^12.1.0"
  }
}

服务/service.js

import got from 'got';

export class Service{
    /**
     * @description HTTP GET /api/v2/findings
     * */
     static async GetService1 () {
        let response = await got("https://google.com");
        return response;
    }
    static async GetService2 () {
        let response = await got("https://google.com");
        return response;
    }
}

index.js

// Import the package
import * as csv from 'csv';
import * as fs from 'fs';
import {Service} from './services/service.js';
console.log("start");
let inStream;
inStream = fs.createReadStream(
  "test.csv");
inStream
  .pipe(csv.parse({
      delimiter: ';'
  }))
  .pipe(
    csv.transform(
      { parallel: 1 }, 
      (record) => {
        let col1 = record[0];
        (async () => {
          let response1, response2;
          response1 = await Service.GetService1()
          console.log("line %d, after call 1", col1)
          response2 = await Service.GetService2()
          console.log("line %d, after call 2", col1)
        })();
        console.log("line %d, after async", col1)
      }))
console.log("end")

test.csv

1;"muti-line
comment 1"
2;"muti-line
comment 2"
3;"muti-line
comment 3"

代码显示:

start
end
line 1, after async
line 2, after async
line 3, after async
line 2, after call 1
line 1, after call 1
line 3, after call 1
line 3, after call 2
line 1, after call 2
line 2, after call 2

我们可以看到有几个问题:

  1. 在transform匿名函数中,主线程在异步块之后继续,即使所有的处理都没有完成(消息"after async")出现在call 1"之后")
  2. 线路被并行处理,即使变压器的并行选项为1:我们可以看到,因为调用1之后的所有">
  3. 所有的"after call 2">

我想要得到的是:

start
line 1, after call 1
line 1, after call 2
line 1, after async
line 2, after call 1
line 2, after call 2
line 2, after async
line 3, after call 1
line 3, after call 2
line 3, after async
end

问题1)是一个关于节点async/await的一般性问题。问题2可能与CSV库或流管道如何工作有关。

有什么建议让一切工作顺序?理想情况下,我希望能够配置给定级别的并行性,但我可以接受完全顺序执行。

我对其他库/实现开放(对于csv解析要注意多行字段)。最好使用async/await而不是promises,因为它似乎是更最新的语法,但如果它只能用promises来完成,为什么不呢…

谢谢!

如果你使用ReadableStream作为AsyncIterator,你可以使用for await ... of循环。这将允许你await你的异步操作:

let inStream;
inStream = fs.createReadStream(
  "test.csv");
const readable = inStream
  .pipe(csv.parse({
      delimiter: ';'
  }));
let i = 0;
for await (const record of readable) {
  let col1 = record[0];
  let response1, response2;
  response1 = await Service.GetService1()
  console.log("line %d, after call 1", col1)
  response2 = await Service.GetService2()
  console.log("line %d, after call 2", col1)
}
console.log("end");

线路被并行处理,即使变压器的并行选项为1

在您的原始代码中,您获得1行CSV数据,您启动异步操作,获得"line %d, after async"日志,然后获得另一行。CSV库一次只解析一行。只是你告诉它"执行一个异步操作,然后等待它完成",而不是"执行一个异步操作,然后继续执行,而不用等待"。


最好使用async/await而不是promises

Async/await 使用承诺,只是语法不同。

for await ... of是一种接受AsyncIterator的循环类型,这是一个对象(例如,ReadableStream的实例),可以返回多个promise并且(技术上可选)说它们'完成'并停止循环。

通过反复获取Promise,将其解析为一个值(一行CSV数据)并在两者之间使用await,您可以逐行处理流,这种方式比使用pipe()on('...')基于事件的函数更短。当流耗尽时,告诉for await ... of循环迭代器'完成',并执行下一行console.log("end");

最新更新