想象一下,你向其发送事件的一个订阅者管道,它访问了一个又一个订阅者。
具有发布主题和 x 订阅者/可观察量。通常,事件以特定顺序发出给观察者,但同时发出,而不管观察者何时返回。是否可以执行此流程:
- 向观察者发出事件 A
- osbserverA 返回后,将事件发送给观察者 B
- 在观察者 B 返回后,将事件发送到观察者 C
我正在使用 RxScala 和 Monifu Rx 实现
Monifu 甚至有一个背压实现:
def onNext(elem: T): Future[Ack]
我希望看到"结果是:更改了!
val subject = PublishSubject[Int]()
var result = "Not Changed"
subject.subscribe { i =>
Observable.timerOneTime(3.seconds, Continue).asFuture.map { x =>
result = "Changed !!"
x.get
}
}
subject.subscribe { i =>
Observable.timerOneTime(1.seconds, Continue).asFuture.map { x =>
println("And Result was : " + result)
x.get
}
}
subject.onNext(1)
在 RxScala/RxJava 或 Monifu 中是否可以在不扩展 Subject 和覆盖 Next 实现的情况下?无论如何,这些类都被宣布为最终的,所以这将是相当黑客。
我认为答案是一个自定义的主题实现,就像在Monifu中这样,它将以flatMap的方式为观察者提供信息(忽略PublishSubject是最终类的事实):
class PipeSubject extends PublishSubject[RxEvent] {
override def onNext(elem: RxEvent): Future[Ack] = {
if (!isCompleted) {
val observers = subscriptions
if (observers.nonEmpty)
pipeThroughMany(observers, elem)
else
Continue
}
else
Cancel
}
private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = {
val length = array.length
def >>>(idx: Int = 0): Future[Continue] = {
val obs = array(idx)
obs.onNext(elem).flatMap {
case Continue =>
if (idx+1 < length)
>>>(idx+1)
else
Continue
case _ =>
removeSubscription(obs)
Continue
}
}
>>>()
}
}