Rxjs:阻塞和延迟流



简而言之,尝试将一个非常大的数组分为10个块,等待5秒后再发出下一个10。

以下是我目前拥有的

Rx.Observable
.from(hugeArray)
.bufferCount(10) 
.delay(5000) //want to wait 5 secs
.flatMap(e => e) // this needs to go after to flatten the array, buffer spits out arrays of entries
.flatMap( (data, index) => Rx.Observable.create(observer => {
// going to render stuff here
observer.onNext(data)
observer.onCompleted();  
}))
.subscribe(val => console.log('Buffered Values:', val));

只想在5秒内完成10个块,只能够完成一个初始延迟,然后它就发出了其余的。

您的链只是一次发出所有内容,然后安排每个区块同时等待5秒,这样所有区块的延迟都会在同一时刻过去。

解决方案可以是使用concatMap(),它逐个订阅每个Observable。

Rx.Observable
.from(hugeArray)
.bufferCount(10)
.concatMap(data => Rx.Observable.of(data).delay(5000))
.flatMap(e => e) // or mergeAll() or concatAll()
.flatMap( (data, index) => Rx.Observable.create(observer => {
// going to render stuff here
observer.onNext(data);
observer.onCompleted();
}))
.subscribe(val => console.log('Buffered Values:', val));

最新更新