使用背压按特定顺序向订阅者发出事件的主题



想象一下,你向其发送事件的一个订阅者管道,它访问了一个又一个订阅者。

具有发布主题和 x 订阅者/可观察量。通常,事件以特定顺序发出给观察者,但同时发出,而不管观察者何时返回。是否可以执行此流程:

  1. 向观察者发出事件
  2. A
  3. osbserverA 返回后,将事件发送给观察者 B
  4. 在观察者 B 返回后,将事件发送到观察者 C

我正在使用 RxScala 和 Monifu Rx 实现

Monifu 甚至有一个背压实现:

def onNext(elem: T): Future[Ack]

我希望看到"结果是:更改了!

  val subject = PublishSubject[Int]()
  var result = "Not Changed"
  subject.subscribe { i =>
    Observable.timerOneTime(3.seconds, Continue).asFuture.map { x =>
      result = "Changed !!"
      x.get
    }
  }
  subject.subscribe { i =>
    Observable.timerOneTime(1.seconds, Continue).asFuture.map { x =>
      println("And Result was : " + result)
      x.get
    }
  }
  subject.onNext(1)

在 RxScala/RxJava 或 Monifu 中是否可以在不扩展 Subject 和覆盖 Next 实现的情况下?无论如何,这些类都被宣布为最终的,所以这将是相当黑客。

我认为答案是一个自定义的主题实现,就像在Monifu中这样,它将以flatMap的方式为观察者提供信息(忽略PublishSubject是最终类的事实):

class PipeSubject extends PublishSubject[RxEvent] {
  override def onNext(elem: RxEvent): Future[Ack] = {
    if (!isCompleted) {
      val observers = subscriptions
      if (observers.nonEmpty)
        pipeThroughMany(observers, elem)
      else
        Continue
    }
    else
      Cancel
  }
 private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = {
    val length = array.length
    def >>>(idx: Int = 0): Future[Continue] = {
      val obs = array(idx)
      obs.onNext(elem).flatMap {
         case Continue =>
           if (idx+1 < length)
              >>>(idx+1)
           else
             Continue
         case _ =>
           removeSubscription(obs)
           Continue
      }
    }
    >>>()
  }
}

相关内容

  • 没有找到相关文章

最新更新