我想实现"调度程序继承"作为使用 RxJava2 的 API 的一部分。我希望我的 API 的使用者能够考虑构建单个处理链而不是 DAG,尽管在内部,新事件正在作为实现细节进行准备。
我看不出有任何方法可以做等效的事情:
observable
.flatMap {
val scheduler = Schedulers().current!!
someOtherObservable
.observeOn(scheduler)
}
有没有其他方法可以继承调度程序?
更多背景
我有一个管道,如下所示:
compositeDisposable += Environment
.lookupDeviceInfo()
.subscribeOn(scheduler)
.flatMap { deviceInfo ->
Device(deviceId = deviceInfo.id)
.sendCommand()
.subscribe(
{ result -> /*process result*/ },
{ e -> /*log error*/ })
对于消费者来说,这看起来像是他们将所有工作推送到指定的scheduler
:来自lookupDeviceInfo()
的事件被矢量化为来自该调度程序的工人,并且他们希望坚持该工作线程。
实际上,它们有一个错误,因为sendCommand()
来自另一个事件源的事件作为实现细节:
sendMessageSingle(deviceId, payload)
.flatMap { sentMessageId ->
responseObservable
.filter { it.messageId == sentMessageId }
.firstOrError()
}
事件从responseObservable
流入,但这些事件都不会被矢量化到指定的scheduler
,因为它被应用于上游。
从评论:
返回到同一个调度程序线程需要你提供一个单线程调度器(即Schedulers.from(Executor)
、Schedulers.single()
等(。没有当前调度程序,因为无法保证某些代码将在任何标准调度程序上运行;它们可以在系统的任意线程、其他框架等上执行。因此,您必须通过observeOn
将信号路由回所需的线程。
我不关心登陆同一个线程,只是同一个调度程序。(即使更换工作线程也可能没问题,只要新工作线程由与旧工作线程相同的调度程序提供服务。
然后建议仍然适用,您可以放弃我提到的"单线程"属性。