我在Scala项目中使用RxJava,并说我有这个简单的Observable
:
Observable[String](observer => while (true) observer onNext "hi")
.subscribe(v => println(v))
println("hello")
我永远不会得到"hello"消息,因为while
阻塞了一个线程。我怎么能运行我的Observable
在一个单独的线程,以避免阻塞?
==================================
我认为observeOn
可以帮助,但它没有。运行这个:
val s = rx.lang.scala.schedulers.NewThreadScheduler.apply
Observable[String](observer => while (true) observer onNext "hi")
.observeOn(s).subscribe(v => println(v))
println("hello")
…仍然不打印"hello"。我想添加observeOn
使OnNext
在一个单独的线程中被调用,而不是while
块本身?
==================================
我当然可以把while
括在Future
中:
Observable[String](observer => Future { while (true) observer onNext "hi" })
.subscribe(v => println(v))
println("test") // "test" gets printed
但是也许有更多的习惯用法来做到这一点呢?
您需要使用subscribeOn
。例如,
Observable[String](observer => while (true) observer onNext "hi").subscribeOn(s)
.subscribe(v => println(v))
subscribeOn
将调用Scheduler
中的subscribe
函数,而observeOn
将向Scheduler
分发消息。