RxJava Flowable.Interval backpressure when flatmap with sing



我遇到了一个场景,我需要定期调用 API 来检查结果。我正在使用Flowable.interval创建一个调用 API 的间隔函数。

但是,我在背压方面遇到了麻烦。在下面的示例中,在间隔中的每个即时报价上都会创建一个新单曲。所需的效果是仅在调用尚未进行时才调用 API

Flowable.interval(1, 1, TimeUnit.SECONDS).flatMap {
        System.out.println("Delay $it")
        //simulates API call
        Single.just(1L).doAfterSuccess {
            System.out.println("NEW SINGLE!!!")
        }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
            System.out.println("SINGLE SUCCESS!!!")
        }.toFlowable()
    }.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()

我可以使用这样的过滤器变量来解决这个问题:

var filter = true
Flowable.interval(1, 1, TimeUnit.SECONDS).filter {
    filter
}.flatMap {
    System.out.println("Delay $it")
    Single.just(1L).doOnSubscribe {
        filter = true
    }.doAfterSuccess {
        System.out.println("NEW SINGLE!!!")
    }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
        System.out.println("SINGLE!!!")
        filter = true
    }.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()

但这似乎是一个黑客解决方案。我已经厌倦了在interval函数之后应用onBackPressureDrop,但它没有效果。

有什么建议吗?

你还必须约束flatMap

Flowable.interval(1, 1, TimeUnit.SECONDS)
.onBackpressureDrop()
.flatMapSingle({
    System.out.println("Delay $it")
    //simulates API call
    Single.just(1L).doAfterSuccess {
        System.out.println("NEW SINGLE!!!")
    }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
        System.out.println("SINGLE SUCCESS!!!")
    }
}, false, 1)  // <----------------------------------------------------------
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe()

最新更新