是否存在一个事件,当所有的观察者都从一个可观察对象中取消订阅



我有一个Observable,我想要它是冷的,也就是说,它应该只在第一个观察者订阅它时才开始发射项目。

然后,我要确保当所有观察者取消订阅同一个可观察对象时,释放源中的所有资源。这可能吗?

你可以使用connectedoobservable的功能来为你处理这个问题:

//Replace Observable.range(1,1000) with your Observable implementation
Observable.range(1, 1000).doOnUnsubscribe(() -> freeResources()).share();

share方法调用publishrefCount方法。

publish将你的"normal" Observable转换为connectedoobservable,它将在你调用connect的那一刻开始发射项目。因此,从技术上讲,您可以订阅任意数量的观察者,然后调用connect开始同时为所有观察者发送项目。

refCount将您的connectedoobservable再次转换为传统的connectedoobservable,但具有新的特性!额外的好处是:这个可观察对象现在是冷的(只有当订阅者订阅时才开始发出,它在内部调用用publish创建的原始ConnectedObservable的connect方法),并跟踪有多少订阅者连接到原始ConnectedObservable。一旦所有订阅者都取消订阅,它将从源connectedoobservable取消订阅,因此逻辑变得简单得多,因为您只需要处理一个订阅。

这里有一个很好的共享操作图:http://reactivex.io/RxJava/javadoc/rx/Observable.html#share()

或者,如果这不够灵活,我认为你应该能够通过使用defer来创建一个冷观察对象,以及doOnSubscribedoOnUnsubscribe方法来轻松实现这种行为。

的例子:

    Observable.defer(() -> {
        final AtomicInteger counter = new AtomicInteger();
        return Observable.range(1, 1000)
                .doOnSubscribe(() -> counter.incrementAndGet())
                .doOnUnsubscribe(() -> {
                    if (counter.decrementAndGet() == 0) {
                        freeResources();
                    }
                });
    });

当第一个订阅者订阅时,这个可观察对象将开始发出一个数字序列(用你的可观察对象实现替换它),它将随着每次订阅增加一个计数器,并在所有订阅者取消订阅后释放已使用的资源(替换freeresource为任何你需要的)

相关内容

最新更新