我正在使用RxJava2,假设我有这个可观察的:
Observable
.create(emitter ->
SomeDependency.registerCallback(data -> emitter.onNext(data))
)
.subscribeOn(Schedulers.io());
它观察一些异步逻辑,然后发出从中获取的任何内容。重要的是要知道,在注册的回调中,数据是在由SomeDependency
处理的线程上传递的。因此,这会导致所有emitter
的排放到下游,而忽略定义的Scheduler
。
https://stackoverflow.com/a/43283760/1618316 中,@akarnokd有一个提示,指示一种使用Scheduler.Worker
将数据重定向到正确线程的方法。这种方法的修改示例如下所示:
Observable
.create(emitter -> {
final Worker worker = Schedulers.trampoline().createWorker();
emitter.setDisposable(worker);
SomeDependency.registerCallback(data ->
worker.schedule(() -> emitter.onNext(data))
)
})
.subscribeOn(Schedulers.io());
注意:
trampoline()
为当前线程创建Scheduler
。在我们的例子中,io()
线程,我们已经定义了创建我们的可观察的线程。
问题是,在创建这种可观察量时,您通常不仅需要registerCallback()
,还需要unregisterCallback()
。在常见情况下,您将unregisterCallback()
放入emitter
的Disposable
中。但是,如您所见,我们的emitter
已经有一个Disposable
,无法设置另一个。如果要设置第二个Disposable
,则取消设置并释放前一个。
您对如何处理这个问题有什么想法吗?
我认为你需要的是CompositeDisposable
.
可以容纳多个其他一次性用品的一次性容器。