我不明白为什么在以下示例中添加executeWithFork
会阻止任务运行:
import java.util.concurrent.TimeUnit
import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.ConcurrentSubject
object Sandbox {
def main(args: Array[String]): Unit = {
implicit val scheduler: SchedulerService =
monix.execution.Scheduler(java.util.concurrent.Executors.newCachedThreadPool())
val input = ConcurrentSubject.publish[String]
// prints nothing
input.foreachL(println).executeWithFork.runAsync
// this works:
// input.foreachL(println).runAsync
input.onNext("one")
input.onNext("two")
scheduler.shutdown()
scheduler.awaitTermination(1, TimeUnit.MINUTES, monix.execution.Scheduler.Implicits.global)
}
}
您看到的行为是两个事实的结果:
-
使用
executeWithFork
为线程切换引入了一点额外的延迟 -
使用
ConcurrentSubject.publish
(例如与replay
相反)。如果你打开PublishSubject
的文档,你可能会看到
PublishSubject
仅向订阅服务器发送源在订阅时间之后发送的那些项目。
换句话说,发布"one"
和"two"
的主线程与必须订阅input
才能获取数据的分支线程之间存在竞争条件。结果取决于哪个线程赢得了比赛:在订阅之前发布的所有数据都将丢失。在我的硬件中,我几乎总是看到"two"
,偶尔甚至会看到"one"
,您的结果可能会有所不同。
测试这一点的最简单方法是在第一个input.onNext
之前添加Thread.sleep(100)
,并且每次都应该看到打印的两个事件。你也可以尝试推送更多的事件,而不仅仅是2个,以确保不是所有的东西都丢失了。