如何正确控制已创建可观察中已定义调度程序的排放量



我正在使用RxJava2,假设我有这个可观察的:

Observable
.create(emitter -> 
SomeDependency.registerCallback(data -> emitter.onNext(data))
)
.subscribeOn(Schedulers.io());

它观察一些异步逻辑,然后发出从中获取的任何内容。重要的是要知道,在注册的回调中,数据是在由SomeDependency处理的线程上传递的。因此,这会导致所有emitter的排放到下游,而忽略定义的Scheduler

https://stackoverflow.com/a/43283760/1618316 中,@akarnokd有一个提示,指示一种使用Scheduler.Worker将数据重定向到正确线程的方法。这种方法的修改示例如下所示:

Observable
.create(emitter -> {
final Worker worker = Schedulers.trampoline().createWorker();
emitter.setDisposable(worker);
SomeDependency.registerCallback(data -> 
worker.schedule(() -> emitter.onNext(data))
)
})
.subscribeOn(Schedulers.io());

注意:trampoline()为当前线程创建Scheduler。在我们的例子中,io()线程,我们已经定义了创建我们的可观察的线程。

问题是,在创建这种可观察量时,您通常不仅需要registerCallback(),还需要unregisterCallback()。在常见情况下,您将unregisterCallback()放入emitterDisposable中。但是,如您所见,我们的emitter已经有一个Disposable,无法设置另一个。如果要设置第二个Disposable,则取消设置并释放前一个。

您对如何处理这个问题有什么想法吗?

我认为你需要的是CompositeDisposable.

可以容纳多个其他一次性用品的一次性容器。

最新更新