只有当一个可观察对象从链中发出一个事件后,才能获取事件



observableOne.flatMap(event -> observableTwo).subscribe()

我想从observableOne第一次发出一个事件,然后忽略这个Observable的所有其他事件,直到observableTwo发出一个值,完成或结束一个错误。值得一提的是,我最终感兴趣的是来自第二个可观察对象的事件。

上下文是这样的,有一个Button点击,触发了一个事件,这是ObservableOne。触发事件触发ObservableTwo,假设这是一个网络操作链。因此,当网络操作正在执行时,我想忽略所有的按钮点击。

您可以使用Flowable而不是Observable,然后您可以通过将flatMapmaxConcurrency设置为1来使用背压来实现所需的效果(删除事件直到observableTwo终止):

observableOne
    .toFlowable(BackpressureStrategy.DROP)
    .flatMap(event -> observableTwo.toFlowable(BackpressureStrategy.ERROR), 1)
    .subscribe();

我在这里贴了一个类似的问题。

在akarnokd的RxJava2Extensions库中也有一个完全用于此目的的ObservableTransformer。它可以这样使用:
observableOne
    .compose(ObservableTransformers.flatMapDrop(event -> observableTwo))
    .subscribe();

要控制flatMap发出的请求数量,请使用一个特殊的过载:

observableOne
  .doOnRequest(System.out::println)
  .flatMap(event -> observableTwo, 1)
  .flatMap(event -> observableThree, 1)
.subscribe()

如果您的源observableOne, observableTwo, observableThree是同步的,这应该是不必要的,但对于异步源,这应该做的工作

[Edit2]问题改变了,所以我调整了我的答案+实际上给出了正确的答案

除了使用状态标志之外,我不认为有其他方法可以做到这一点:

AtomicBoolean progress = new AtomicBoolean();
observableOne
        .filter(event -> !progress.get())
        .flatMap(event ->
                observableTwo
                        .doOnSubscribe(() -> progress.set(true))
                        .doOnTerminate(() -> progress.set(false))
        )
        .subscribe();

如果发生错误,您的订阅将被取消,您将不会收到任何更多的事件,即使再次点击按钮。

如果不是你想要的,你可以:

  • 重新订阅错误回调

    private void bindRemoteCalls() {
        if (mySubscription != null) mySubscription.unsubscribe();
        AtomicBoolean progress = new AtomicBoolean();
        mySubscription = observableOne
            .filter(event -> !progress.get())
            .flatMap(event ->
                    observableTwo
                            .doOnSubscribe(() -> progress.set(true))
                            .doOnTerminate(() -> progress.set(false))
            )
            .flatMap(event -> observableTwo, 1)
            .subscribe(
                data -> handleResponse(data),
                error -> {
                    handleError(error);
                    bindRemoteCalls();
                }
            );
    }
    
  • 使用onErrorReturn()(结合doOnError()实际做一些事情)

    AtomicBoolean progress = new AtomicBoolean();
    observableOne
        .filter(event -> !progress.get())
        .flatMap(event ->
                observableTwo
                        .doOnSubscribe(() -> progress.set(true))
                        .doOnTerminate(() -> progress.set(false))
                        .doOnError(error -> handleError(error))
                        .onErrorReturn(null)
                        .filter(data -> data != null)
        )
        .subscribe(data -> handleResponse(data));
    

如果需要的话,请记住使用subscribeOn()/observeOn()和正确的调度程序。

请考虑使用switchMap()代替flatMap()——>如果按钮再次按下,以前的呼叫被取消(取消订阅),然后一个新的呼叫开始。或者用Rx术语来说:observableTwo先前的订阅被取消订阅,并形成一个新的订阅。

如果你禁用你的按钮退订和启用它终止时,你可以很容易地获得相同的结果,使按钮不可点击

相关内容

  • 没有找到相关文章

最新更新