当另一个源进入管道时,如何实现调度程序继承?



我想实现"调度程序继承"作为使用 RxJava2 的 API 的一部分。我希望我的 API 的使用者能够考虑构建单个处理链而不是 DAG,尽管在内部,新事件正在作为实现细节进行准备。

我看不出有任何方法可以做等效的事情:

observable
.flatMap {
val scheduler = Schedulers().current!!
someOtherObservable
.observeOn(scheduler)
}

有没有其他方法可以继承调度程序?

更多背景

我有一个管道,如下所示:

compositeDisposable += Environment
.lookupDeviceInfo()
.subscribeOn(scheduler)
.flatMap { deviceInfo ->
Device(deviceId = deviceInfo.id)
.sendCommand()
.subscribe(
{ result -> /*process result*/ },
{ e -> /*log error*/ })

对于消费者来说,这看起来像是他们将所有工作推送到指定的scheduler:来自lookupDeviceInfo()的事件被矢量化为来自该调度程序的工人,并且他们希望坚持该工作线程。

实际上,它们有一个错误,因为sendCommand()来自另一个事件源的事件作为实现细节:

sendMessageSingle(deviceId, payload)
.flatMap { sentMessageId ->
responseObservable
.filter { it.messageId == sentMessageId }
.firstOrError()
}

事件从responseObservable流入,但这些事件都不会被矢量化到指定的scheduler,因为它被应用于上游。

从评论:

返回到同一个调度程序线程需要你提供一个单线程调度器(即Schedulers.from(Executor)Schedulers.single()等(。没有当前调度程序,因为无法保证某些代码将在任何标准调度程序上运行;它们可以在系统的任意线程、其他框架等上执行。因此,您必须通过observeOn将信号路由回所需的线程。

我不关心登陆同一个线程,只是同一个调度程序。(即使更换工作线程也可能没问题,只要新工作线程由与旧工作线程相同的调度程序提供服务。

然后建议仍然适用,您可以放弃我提到的"单线程"属性。

相关内容

  • 没有找到相关文章

最新更新