Monix Task.sleep 和单线程执行



我正在尝试理解Monix中的任务调度原则。 以下代码(源:https://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3(仅生成"1",如预期的那样。

val s1: Scheduler = Scheduler(
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()),
ExecutionModel.SynchronousExecution)
def repeat(id: Int): Task[Unit] =
Task(println(s"$id ${Thread.currentThread().getName}")) >> repeat(id)
val prog: Task[(Unit, Unit)] = (repeat(1), repeat(2)).parTupled
prog.runToFuture(s1)
// Output:
// 1 pool-1-thread-1
// 1 pool-1-thread-1
// 1 pool-1-thread-1
// ...

当我们向repeat方法添加Task.sleep

def repeat(id: Int): Task[Unit] =
Task(println(s"$id ${Thread.currentThread().getName}")) >>
Task.sleep(1.millis) >> repeat(id)

输出更改为

// Output
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// ...

这两个任务现在在单个线程上同时执行!不错的:) 一些合作屈服已经开始。这里到底发生了什么?谢谢:)

编辑:同样的事情发生在Task.shift而不是Task.sleep

我不确定这是否是您正在寻找的答案,但这里是:

尽管命名表明并非如此,但Task.sleep无法与更传统的方法(如Thread.sleep(进行比较。

Task.sleep实际上并不在线程上运行,而只是指示调度程序在经过的时间后运行回调。

以下是monix/TaskSleep.scala中的一些代码片段,用于比较:

[...]
implicit val s = ctx.scheduler
val c = TaskConnectionRef()
ctx.connection.push(c.cancel)
c := ctx.scheduler.scheduleOnce(
timespan.length,
timespan.unit,
new SleepRunnable(ctx, cb)
)
[...]
private final class SleepRunnable(ctx: Context, cb: Callback[Throwable, Unit]) extends Runnable {
def run(): Unit = {
ctx.connection.pop()
// We had an async boundary, as we must reset the frame
ctx.frameRef.reset()
cb.onSuccess(())
}
}
[...]

在执行回调(这里:cb(之前的一段时间内,你的单线程调度程序(这里:ctx.scheduler(可以简单地使用他的线程进行接下来排队的任何计算。

这也解释了为什么这种方法更可取,因为我们不会在休眠间隔期间阻塞线程 - 浪费更少的计算周期。

希望这有帮助。

扩展马库斯的答案。

作为心智模型(出于说明目的(,您可以将线程池想象成堆栈。由于您只有一个执行程序线程池,因此它将尝试先运行repeat1,然后再运行repeat2

在内部,一切都只是一个巨大的FlatMap.运行循环将根据执行模型调度所有任务。

发生的情况是,sleep调度一个可运行的线程池。它将可运行的(repeat1(推到堆栈的顶部,从而给repeat2运行的机会。同样的事情也会发生在repeat2.

请注意,默认情况下,Monix的执行模型将为每个1024平面图执行异步边界。

相关内容

  • 没有找到相关文章

最新更新