假设您在rxjava-scala-0.18.4
Observable
@volatile var dorun = true
var subscriber: Subscriber[String] = null
val myObs = Observable { obs: Subscriber[String] =>
subscriber = obs
Subscription { println("unsubscribed"); dorun = false }
}
val sub = myObs.head.subscribe(println(_))
assertTrue(dorun)
subscriber.onNext("hello")
Thread.sleep(500)
assertFalse(dorun)
subscriber.onNext("world")
Thread.sleep(500)
assertFalse(dorun)
第二个断言失败,这意味着head
没有取消订阅。我对可观察对象的理解是错误的,还是应该在发出第一个元素后取消订阅head
?
看一下您的subscribe()
方法:您循环直到run
被设置为false
,但是发生这种情况的唯一方法是关闭订阅。问题是还没有人订阅:循环使您无法返回。head
操作符不能在发送第一个项目之后终止底层订阅,因为它还没有完成的订阅。因此,你只是一直循环下去。
一个解决方案是将循环移动到Schedulers.trampoline()
上调度的操作中。然后事件将在从subscribe()
返回一段时间后交付。
此外,在您的subscribe()
方法中,似乎您需要将新的订阅对象添加到传入的Subscriber
中,如下所示:
val myObs = Observable {
obs: rx.lang.scala.Subscriber[String] =>
...
obs.add(
Subscription {
dorun = false
println("unsubscribed")
}
)
}