我想为monix.reactive.Observable
写一个拆分函数。它应该根据谓词的值将源Observable[A]
拆分为一个新的(Observable[A], Observable[A])
对,并根据源中的每个元素进行评估。我希望拆分独立于源可观察是热还是冷而工作。在源为冷的情况下,新的可观察量对也应该是冷的,在源较热的地方,新的可观察量对将是热的。我想知道这样的实现是否可行,如果是,如何实现(我在下面粘贴了一个失败的测试用例(。
签名作为隐式类上的方法,看起来像或类似于
/**
* Split an observable by a predicate, placing values for which the predicate returns true
* to the right (and values for which the predicate returns false to the left).
* This is consistent with the convention adopted by Either.cond.
*/
def split(p: T => Boolean)(implicit scheduler: Scheduler, taskLike: TaskLike[Future]): (Observable[T], Observable[T]) = {
splitEither[T, T](elem => Either.cond(p(elem), elem, elem))
}
目前,我有一个朴素的实现,它使用源元素并将它们推送到PublishSubject
。因此,新的可观察量对很热门。我对冷可观察的测试失败了。
import monix.eval.TaskLike
import monix.execution.{Ack, Scheduler}
import monix.reactive.{Observable, Observer}
import monix.reactive.subjects.PublishSubject
import scala.concurrent.Future
object ObservableOps {
implicit class ObservableExtensions[T](o: Observable[T]) {
/**
* Split an observable by a predicate, placing values for which the predicate returns true
* to the right (and values for which the predicate returns false to the left).
* This is consistent with the convention adopted by Either.cond.
*/
def split(p: T => Boolean)(implicit scheduler: Scheduler, taskLike: TaskLike[Future]): (Observable[T], Observable[T]) = {
splitEither[T, T](elem => Either.cond(p(elem), elem, elem))
}
/**
* Split an observable into a pair of Observables, one left, one right, according
* to a determinant function.
*/
def splitEither[U, V](f: T => Either[U, V])(implicit scheduler: Scheduler, taskLike: TaskLike[Future]): (Observable[U], Observable[V]) = {
val l = PublishSubject[U]()
val r = PublishSubject[V]()
o.subscribe(new Observer[T] {
override def onNext(elem: T): Future[Ack] = {
f(elem) match {
case Left(u) => l.onNext(u)
case Right(v) => r.onNext(v)
}
}
override def onError(ex: Throwable): Unit = {
l.onError(ex)
r.onError(ex)
}
override def onComplete(): Unit = {
l.onComplete()
r.onComplete()
}
})
(l, r)
}
}
}
//////////
import ObservableOps._
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import monix.reactive.subjects.PublishSubject
import org.scalatest.FlatSpec
import org.scalatest.Matchers._
import org.scalatest.concurrent.ScalaFutures._
class ObservableOpsSpec extends FlatSpec {
val isEven: Int => Boolean = _ % 2 == 0
"Observable Ops" should "split a cold observable" in {
val o = Observable(1, 2, 3, 4, 5)
val (l, r) = o.split(isEven)
l.toListL.runToFuture.futureValue shouldBe List(1, 3, 5)
r.toListL.runToFuture.futureValue shouldBe List(2, 4)
}
"Observable Ops" should "split a hot observable" in {
val o = PublishSubject[Int]()
val (l, r) = o.split(isEven)
val lbuf = l.toListL.runToFuture
val rbuf = r.toListL.runToFuture
Observable.fromIterable(1 to 5).mapEvalF(i => o.onNext(i)).subscribe()
o.onComplete()
lbuf.futureValue shouldBe List(1, 3, 5)
rbuf.futureValue shouldBe List(2, 4)
}
}
我希望上面的两个测试用例都能通过,但"Observable Ops" should "split a cold observable"
失败了。
编辑:工作代码
通过这两个测试用例的实现如下所示:
import monix.execution.Scheduler
import monix.reactive.Observable
object ObservableOps {
implicit class ObservableExtension[T](o: Observable[T]) {
/**
* Split an observable by a predicate, placing values for which the predicate returns true
* to the right (and values for which the predicate returns false to the left).
* This is consistent with the convention adopted by Either.cond.
*/
def split(
p: T => Boolean
)(implicit scheduler: Scheduler): (Observable[T], Observable[T]) = {
splitEither[T, T](elem => Either.cond(p(elem), elem, elem))
}
/**
* Split an observable into a pair of Observables, one left, one right, according
* to a determinant function.
*/
def splitEither[U, V](
f: T => Either[U, V]
)(implicit scheduler: Scheduler): (Observable[U], Observable[V]) = {
val oo = o.map(f)
val l = oo.collect {
case Left(u) => u
}
val r = oo.collect {
case Right(v) => v
}
(l, r)
}
}
}
class ObservableOpsSpec extends FlatSpec {
val isEven: Int => Boolean = _ % 2 == 0
"Observable Ops" should "split a cold observable" in {
val o = Observable(1, 2, 3, 4, 5)
val o2 = o.publish
val (l, r) = o2.split(isEven)
val x= l.toListL.runToFuture
val y = r.toListL.runToFuture
o2.connect()
x.futureValue shouldBe List(1, 3, 5)
y.futureValue shouldBe List(2, 4)
}
"Observable Ops" should "split a hot observable" in {
val o = PublishSubject[Int]()
val (l, r) = o.split(isEven)
val lbuf = l.toListL.runToFuture
val rbuf = r.toListL.runToFuture
Observable.fromIterable(1 to 5).mapEvalF(i => o.onNext(i)).subscribe()
o.onComplete()
lbuf.futureValue shouldBe List(1, 3, 5)
rbuf.futureValue shouldBe List(2, 4)
}
}
根据定义,冷可观察对象是为每个订阅者延迟计算的。如果不对所有内容进行两次评估或将其转换为热的,则无法拆分它。
如果您不介意对所有内容进行两次评估,只需使用.filter
两次即可。 如果您不介意转换为热,请使用.publish
(或.publish.refCount
,这样您就不需要手动connect
(。 如果你想保留冷/热属性并并行处理两个部分,有一种publishSelector
方法可以让你在有限的范围内将任何可观察的东西当作热的处理:
coldOrHot.publishSelector { totallyHot =>
val s1 = totallyHot.filter(...).flatMap(...) // any processing
val s2 = totallyHot.filter(...).mapEval(...) // any processing 2
Observable(s1, s2).merge
}
除了作用域之外,它的限制是内部 lambda 的结果必须是另一个可观察的(将从 publishSelector 返回(,因此您不能拥有具有所需签名的帮助程序。但如果原版是冷的,结果还是会很冷。