为什么头部不取消订阅

  • 本文关键字:取消 头部 scala rx-java
  • 更新时间 :
  • 英文 :


假设您在rxjava-scala-0.18.4

中有以下Observable
@volatile var dorun = true
var subscriber: Subscriber[String] = null
val myObs = Observable { obs: Subscriber[String] =>
  subscriber = obs
  Subscription { println("unsubscribed"); dorun = false }
}
val sub = myObs.head.subscribe(println(_))
assertTrue(dorun)
subscriber.onNext("hello")
Thread.sleep(500)
assertFalse(dorun)
subscriber.onNext("world")
Thread.sleep(500)
assertFalse(dorun)

第二个断言失败,这意味着head没有取消订阅。我对可观察对象的理解是错误的,还是应该在发出第一个元素后取消订阅head ?

看一下您的subscribe()方法:您循环直到run被设置为false,但是发生这种情况的唯一方法是关闭订阅。问题是还没有人订阅:循环使您无法返回。head操作符不能在发送第一个项目之后终止底层订阅,因为它还没有完成的订阅。因此,你只是一直循环下去。

一个解决方案是将循环移动到Schedulers.trampoline()上调度的操作中。然后事件将在从subscribe()返回一段时间后交付。

此外,在您的subscribe()方法中,似乎您需要将新的订阅对象添加到传入的Subscriber中,如下所示:

val myObs = Observable {
    obs: rx.lang.scala.Subscriber[String] =>
        ...
        obs.add(
            Subscription {
                dorun = false
                println("unsubscribed")
            }
        )
}

相关内容

  • 没有找到相关文章

最新更新