observableOne.flatMap(event -> observableTwo).subscribe()
我想从observableOne
第一次发出一个事件,然后忽略这个Observable
的所有其他事件,直到observableTwo
发出一个值,完成或结束一个错误。值得一提的是,我最终感兴趣的是来自第二个可观察对象的事件。
上下文是这样的,有一个Button
点击,触发了一个事件,这是ObservableOne
。触发事件触发ObservableTwo
,假设这是一个网络操作链。因此,当网络操作正在执行时,我想忽略所有的按钮点击。
您可以使用Flowable
而不是Observable
,然后您可以通过将flatMap
的maxConcurrency
设置为1
来使用背压来实现所需的效果(删除事件直到observableTwo
终止):
observableOne
.toFlowable(BackpressureStrategy.DROP)
.flatMap(event -> observableTwo.toFlowable(BackpressureStrategy.ERROR), 1)
.subscribe();
我在这里贴了一个类似的问题。
在akarnokd的RxJava2Extensions库中也有一个完全用于此目的的ObservableTransformer
。它可以这样使用:
observableOne
.compose(ObservableTransformers.flatMapDrop(event -> observableTwo))
.subscribe();
要控制flatMap
发出的请求数量,请使用一个特殊的过载:
observableOne
.doOnRequest(System.out::println)
.flatMap(event -> observableTwo, 1)
.flatMap(event -> observableThree, 1)
.subscribe()
如果您的源observableOne
, observableTwo
, observableThree
是同步的,这应该是不必要的,但对于异步源,这应该做的工作
[Edit2]问题改变了,所以我调整了我的答案+实际上给出了正确的答案
除了使用状态标志之外,我不认为有其他方法可以做到这一点:
AtomicBoolean progress = new AtomicBoolean();
observableOne
.filter(event -> !progress.get())
.flatMap(event ->
observableTwo
.doOnSubscribe(() -> progress.set(true))
.doOnTerminate(() -> progress.set(false))
)
.subscribe();
如果发生错误,您的订阅将被取消,您将不会收到任何更多的事件,即使再次点击按钮。
如果不是你想要的,你可以:
重新订阅错误回调
private void bindRemoteCalls() { if (mySubscription != null) mySubscription.unsubscribe(); AtomicBoolean progress = new AtomicBoolean(); mySubscription = observableOne .filter(event -> !progress.get()) .flatMap(event -> observableTwo .doOnSubscribe(() -> progress.set(true)) .doOnTerminate(() -> progress.set(false)) ) .flatMap(event -> observableTwo, 1) .subscribe( data -> handleResponse(data), error -> { handleError(error); bindRemoteCalls(); } ); }
使用
onErrorReturn()
(结合doOnError()
实际做一些事情)AtomicBoolean progress = new AtomicBoolean(); observableOne .filter(event -> !progress.get()) .flatMap(event -> observableTwo .doOnSubscribe(() -> progress.set(true)) .doOnTerminate(() -> progress.set(false)) .doOnError(error -> handleError(error)) .onErrorReturn(null) .filter(data -> data != null) ) .subscribe(data -> handleResponse(data));
如果需要的话,请记住使用subscribeOn()
/observeOn()
和正确的调度程序。
请考虑使用switchMap()
代替flatMap()
——>如果按钮再次按下,以前的呼叫被取消(取消订阅),然后一个新的呼叫开始。或者用Rx术语来说:observableTwo先前的订阅被取消订阅,并形成一个新的订阅。
如果你禁用你的按钮退订和启用它终止时,你可以很容易地获得相同的结果,使按钮不可点击