RXJS节点等待可观察到的序列订阅下一个



我试图用从Google表中刮下来的值填充我的数据库。hasurahelper.insert()在我的数据库中发表了HTTP帖子。我该如何制作它,以便仅在上一个API呼叫返回后才调用insert()?我认为一个辅助图可以解决问题,但似乎仍然急切地订阅了所有排放。

当前:请求1->请求2-> ... ->请求1完成 ->请求2完成 -> ...

我想要的:请求1->请求1完成 ->请求2->请求2完成 -> ...

这是相关代码:

sheetsHelper.authToken()
    .flatMap(sheetsHelper.get)
    .flatMap(response => response.values) //values is a 2d array
    .map(row => {
        console.log(row[0]);
        const flights = [];
        //add 100-500 objects to this array
        return flights;
    })
    .flatMap(flights => {
        const arrayOfFlightArrays = [];
        //max batch size of 50
        while (flights.length) {
            const flightArr = flights.splice(0, 50);
            arrayOfFlightArrays.push(flightArr);
        }
        return Rx.Observable.from(arrayOfFlightArrays);
    })
    .concatMap(flights => hasuraHelper.insert(flights) //insert flights into table
        .retry()) 
    .map(response => response.data)
    .subscribe(console.log, console.error);

这就是插入的样子:

exports.insert = function (objects) {
    return Rx.Observable.fromPromise(axios({
        method: 'post',
        url: '/',
        data: {
            type: "insert",
            args: {
                table: "flights",
                objects: objects
            }
        }
    }));
};

您的插入功能不是懒惰的。您正在将执行诺言包裹在可观察的诺言中,但它已经在执行。

要获得懒惰并开始处理时,您需要将其包裹在Rx.Observable.defer(() => Rx.Observable.fromPromise(axios(/* ... */))

最新更新