等待异步调用完成,然后将新的可观察对象传递给新的



我在一些rxjs调用中遇到了一些困难,需要一些帮助。我的问题是打包一个id列表,对每个id进行异步调用,然后在异步调用完成后传递下一个id。不幸的是,我还没有找到合适的解决方案。

到目前为止,我的代码是:

import { from, interval, of, Subject, timer } from 'rxjs';
import { concatMap, filter, flatMap, map, take } from 'rxjs/operators';
const callFinised$ = new Subject();
const makeAsyncCall = val => {
console.log('start call for ', val);
interval(1000)
.pipe(take(1))
.subscribe(() => {
callFinised$.next(true);
console.log('call finished for ', val);
});
};
from(['id1', 'id2'])
.pipe(
concatMap(val => {
return of(val).pipe(
flatMap(() => {
makeAsyncCall(val);
return callFinised$;
}),
filter(callFinised => callFinised === true),
take(1),
map(() => {
return val + '_updated';
})
);
})
)
.subscribe(finalVal => console.log('finalVal', finalVal));

这里的stackblitz关联:stackblitz

第二个id是在第一个异步完成之前发出的,所以我有点不知所措。

向致以最良好的问候

这是您的代码的一个版本,(我认为)可以实现您想要的功能。

const makeAsyncCall = val => defer(() => {
console.log('start call for ', val);
return timer(1000).pipe(
map(_ => true),
tap(_ => console.log('call finished for ', val))
);
});
from(['id1', 'id2']).pipe(
concatMap(id => makeAsyncCall(id).pipe(
filter(callFinished => callFinished === true),
map(_ => id + '_updated')
))
).subscribe(finalVal => console.log('finalVal', finalVal));

更新:

在上面的代码中,Defer并不能为您完成任何事情,因为您基本上在调用makeAsyncCall的同时对返回的observable调用subscribe

然而,Observables的创建和订阅是不同的。

在下面的代码中,我首先创建了所有的Observable,然后将它们连接起来,并一个接一个地调用它们。如果没有defer,此代码将不会输出您所期望的内容。

const idCalls = ["id1", "id2"].map(id => 
makeAsyncCall(id).pipe(
filter(callFinished => callFinished === true),
map(_ => id + '_updated')
)
);
concat(...idCalls).subscribe(
finalVal => console.log('finalVal', finalVal)
);

最新更新