monix:Task.executeWithFork是否阻止执行



我不明白为什么在以下示例中添加executeWithFork会阻止任务运行:

import java.util.concurrent.TimeUnit
import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.ConcurrentSubject
object Sandbox {
def main(args: Array[String]): Unit = {
implicit val scheduler: SchedulerService =
monix.execution.Scheduler(java.util.concurrent.Executors.newCachedThreadPool())
val input = ConcurrentSubject.publish[String]
// prints nothing
input.foreachL(println).executeWithFork.runAsync
// this works:
// input.foreachL(println).runAsync
input.onNext("one")
input.onNext("two")
scheduler.shutdown()
scheduler.awaitTermination(1, TimeUnit.MINUTES, monix.execution.Scheduler.Implicits.global)
}
}

您看到的行为是两个事实的结果:

  1. 使用executeWithFork为线程切换引入了一点额外的延迟

  2. 使用ConcurrentSubject.publish(例如与replay相反)。如果你打开PublishSubject的文档,你可能会看到

PublishSubject仅向订阅服务器发送源在订阅时间之后发送的那些项目。

换句话说,发布"one""two"的主线程与必须订阅input才能获取数据的分支线程之间存在竞争条件。结果取决于哪个线程赢得了比赛:在订阅之前发布的所有数据都将丢失。在我的硬件中,我几乎总是看到"two",偶尔甚至会看到"one",您的结果可能会有所不同。

测试这一点的最简单方法是在第一个input.onNext之前添加Thread.sleep(100),并且每次都应该看到打印的两个事件。你也可以尝试推送更多的事件,而不仅仅是2个,以确保不是所有的东西都丢失了。

相关内容

  • 没有找到相关文章

最新更新