使用 RxJava 构建"task queue"的最佳方式



目前我正在研究许多与网络相关的功能。目前,我正在处理一个网络通道,该通道允许我一次发送 1 条信息,我必须等待它被确认才能发送下一条信息。我用1..n连接的客户端代表服务器。

其中一些消息,我必须以块的形式发送,这在RxJava中相当容易做到。目前我的"写作"方法看起来像这样:

fun write(bytes: ByteArray, ignoreMtu: Boolean) = 
server.deviceList()
.first(emptyList())
.flatMapObservable { devices ->
Single.fromCallable {
if (ignoreMtu) {
bytes.size
} else {
devices.minBy { device -> device.mtu }?.mtu ?: DEFAULT_MTU
}
}
.flatMapObservable { minMtu ->
Observable.fromIterable(bytes.asIterable())
.buffer(minMtu)
}
.map { it.toByteArray() }
.doOnNext { server.currentData = bytes }
.map { devices }
// part i've left out: waiting for each device acknowledging the message, timeouts, etc.
}

这里缺少的是我只允许同时发送one一条信息的部分。此外,我要求的是,如果我将消息添加到队列中,我必须能够仅观察此消息的状态(已完成,错误)。

我已经考虑过实现这一目标的最优雅的方法是什么。我想出的解决方案包括例如一个PublishSubject<ByteArray>,我在其中推送消息(类似队列),添加订阅者并观察它 - 但如果上一条消息失败,这将抛出例如onError

我想到的另一种方法是在创建/排队时给每条消息一个数字,并有一个全局"消息计数器"可观察的,我会用currently sent message == MY_MESSAGE_IDfilter钩到链的开头。但这感觉有点脆弱。每当订阅终止时,我都可以增加计数器,但我相信一定有更好的方法来实现我的目标。

感谢您的帮助。

供将来参考:我发现的最直接的方法是添加一个在单个线程上工作的调度程序,从而按顺序处理每个任务。

最新更新