可观察.创建和观察者处置



我试图理解当我使用

    Observable.just(1).subscribe(new Observer<Integer>() {
        Disposable disposable;
        @Override
        public void onSubscribe(Disposable disposable) {
            System.out.println("Subscribed");
            this.disposable = disposable;
        }
        @Override
        public void onNext(Integer integer) {
            System.out.println(integer);
            System.out.println(disposable.isDisposed());
        }
        @Override
        public void onError(Throwable throwable) {
            System.out.println("Error");
            System.out.println(disposable.isDisposed());
        }
        @Override
        public void onComplete() {
            System.out.println("Complete");
            System.out.println(disposable.isDisposed());
        }
    })

OnCompleteOnError后,disposable.isDisposed()返回true,就像我使用时一样

  Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
            if (!observableEmitter.isDisposed())
                observableEmitter.onComplete();
        }
    }).subscribe(new Observer<Integer>() {
        Disposable disposable;
        @Override
        public void onSubscribe(Disposable disposable) {
            System.out.println("Subscribed");
            this.disposable = disposable;
        }
        @Override
        public void onNext(Integer integer) {
            System.out.println(integer);
            System.out.println(disposable.isDisposed());
        }
        @Override
        public void onError(Throwable throwable) {
            System.out.println("Error");
            System.out.println(disposable.isDisposed());
        }
        @Override
        public void onComplete() {
            System.out.println("Complete");
            System.out.println(disposable.isDisposed());
        }
    }); 

我看到disposable.isDisposed()返回错误。有人可以解释一下到底发生了什么吗?我理解一个写得很好的 Observable.create 在 onComplete()onError() 之后不能发出项目。

Disposable只保存对subscription的引用,为了释放它,你需要调用disposable.dispose()Observable不会在完整方法上释放Disposable实例。

 @Override
        public void onComplete() {
            System.out.println("Complete");
            disposable.dispose();
            System.out.println(disposable.isDisposed());
        }

最初,isDisposed旨在表示dispose被调用。由于 onErroronComplete 默认情况下不会调用dispose,因此终止后通常不会isDisposed true。Reactive Streams 规范指出,当调用 onErroronComplete 时,订阅应被视为已取消,这意味着实现中不需要存在实际代码(因此开销(。

不幸的是,人们开始将其用作流之外的完成指示器(即,查看特定subscribe()是否已达到其最终状态(。我不建议依赖isDisposed(Disposable容器外部(,因为它将完成的异步反应与等待完成的可能同步(和阻塞(混为一谈。

在您的特定情况下,disposable将在onComplete返回后指示isDisposed,因此外部视图将指示isDisposed为真。

最新更新