异步操作的可观察(主题),当其中没有重叠的异步操作时完成



我希望答案是forkJoin/Promises.all,但有点多,请耐心等待。

我有一个承诺的来源,可以按随机顺序到达,我需要用某种方式说"到目前为止到达的所有承诺都完成了,请告诉我"。

在基于Promise的解决方案中,我最初考虑使用Promise.all,但当其他人还没有完成时,Promise仍然可以"到达"。有趣的是,在https://stackoverflow.com/a/37819138/239168

我正试着用Rx的方式来做这件事。在阅读了一些文档后,我认为forkJoinPromise.all的等价物,然而,同样的问题是,我没有时间点可以安全地调用forkJoinPromise.all,因为在另一个仍然挂起的情况下,总是可以再添加一个。。。由于我现在可能没有任何意义,我想我会寻求一些指导。

设置

(如果这很傻,请别笑,我是Rx的新手…)

我有一个主题,我想知道里面的所有承诺什么时候都完成了。。。而且它总是可以在任何时候获得新的附加承诺。。。

private promiseSource = new Subject<Promise<any>>();
promises$ = this.promiseSource.asObservable();

每次新的承诺"到来",我只是把它添加到主题中

this.promiseSource.next(somePromise);

我想神奇地发生的是——只要主题只包含完整的承诺,就让它"完整"。

例如

promises$.magicFlatMapForkJoinConcatMapTrickery().subscribe({
next: x => ...,
error: err => ...,
complete: () => {
console.log('all promises we got so far are done');
// nice to have, I want this to keep "listening" for new promises
promiseSource.youAreNotREALYCompletePleaseReset(); 
}
});

或者换句话说,我有一个可观察到的异步动作,如果我们看一下内容,我们可以看到重叠的异步动作。我想知道什么时候没有重叠,例如

|<-async action 1->|   |<-async action 3->|
|<-async action 2->|                     |<-async action 4->|
/      /
find this gap

例如,如果这些是http调用,我基本上是在问——告诉我什么时候没有打开的http调用。

tl;dr

如何在RxJS世界中实现这个基于Promises的答案。。。

https://stackoverflow.com/a/37819138/239168

如果我正确解释了你的问题,你只对一个信号感兴趣,这个信号表明是否有待定的承诺。

使用mergescan可以很容易地创建一个可观察的对象,该对象会发出一系列挂起的承诺,因此,您应该能够创建任何您喜欢的信号。

基本上,每次主体发出promise时,挂起的promise的计数都应该递增。每次这些承诺中的一个得到解决,计数都可以递减。

const promises = new Rx.Subject();
const pendingCount = Rx.Observable
.merge(
promises.mapTo(1),
promises.mergeMap(p => Rx.Observable.from(p).mapTo(-1))
)
.scan((acc, value) => acc + value, 0)
.do(count => console.log(`${count} pending promise(s)`));
const doneSignal = pendingCount
.filter(count => count === 0)
.mapTo("done");
doneSignal.subscribe(signal => console.log(signal));
const timeoutPromise = (delay) => new Promise(resolve => setTimeout(resolve, delay));
promises.next(timeoutPromise(200));
setTimeout(() => promises.next(timeoutPromise(200)), 100);
setTimeout(() => promises.next(timeoutPromise(200)), 300);
setTimeout(() => promises.next(timeoutPromise(200)), 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

根据前面的答案,我可以想出一种相当简单的方法来实现这一点。您可以使用fromPromise将您的Subject<Promise<any>>转换为Subject<Observable<any>>,然后您可以使用本答案中描述的active函数将其简化为活动可观测的可观测值。一旦你做到了,你可以把你的查询短语为"当活动流的数组变空时",这可以用一个简单的过滤器来完成,例如:

active(yourSubjectOfObservables).filter(x => x.length === 0).subscribe(() => {
// here we are, all complete
});

这将在每次活动流的数量转换为零时触发,因此如果您只想第一次,请放置.take(1)或。CCD_ 15。

可能不是最漂亮的解决方案,但它在概念上很简单。

最新更新