如何在 RxJava 中的自定义可观察对象中获取观察者的取消订阅操作通知



我正在尝试将一些基于listener-pattern的API包装到Observable。我的代码大致如下。

def myObservable = Observable.create({ aSubscriber ->
    val listener = {event -> 
      aSubscriber.onNext(event);                
    }
    existingEventSource.addListener(listener)
})

然而,我希望我的observable在observable调用subscription.unsibe()时立即将侦听器从底层现有EventSource中删除。我该如何实现这一目标?

Subscriber抽象类实际上有一个方法add,它允许您添加将与订阅者取消订阅的Subscription

def myObservable = Observable.create({ aSubscriber ->
    val listener = {event -> 
      aSubscriber.onNext(event);                
    }
    existingEventSource.addListener(listener)
    // Adds a lambda to be executed when the Subscriber un-subscribes from your Observable
    aSubscriber.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})

aSubscriber想象成订阅了ObservableObserver;我们称之为Subscriber。只要Subscriber仍然订阅ObservableObservable就可以发出值。但当Subscriber取消订阅时,它应该停止。但是,如果我们想在Subscriber取消订阅时得到通知,我们可以注册一个Action,以便在它发生时运行。这就是add方法的用途。正如@dwursteisen在评论中提到的那样;您基本上是在注册一个lambda,该lambda将在订阅服务器取消订阅时执行。

也可以在其他计划程序上取消订阅。请参阅rxanroid项目中的MainThreadSubscription,了解如何实现这一点的示例。

下面是一个如何使用它在主线程上取消订阅的示例

aSubscriber.add(new MainThreadSubscription() {
    @Override
    protected void onUnsubscribe() {
        existingEventSource.removeListener(listener);
    }
});

最新更新