Scala理解的未来:顺序与并行



这里有SeqPar对象,它包含一个task例程,这是一个简单的模拟Future,它打印出一些调试信息并返回Future[Int]类型。

问题是:为什么experiment1被允许并行运行,而experiment2总是按顺序运行?

object SeqPar {
def experiment1: Int = {
val f1 = task(1)
val f2 = task(2)
val f3 = task(3)
val computation = for {
r1 <- f1
r2 <- f2
r3 <- f3
} yield (r1 + r2 + r3)
Await.result(computation, Duration.Inf)
}
def experiment2: Int = {
val computation = for {
r1 <- task(1)
r2 <- task(2)
r3 <- task(3)
} yield (r1 + r2 + r3)
Await.result(computation, Duration.Inf)
}
def task(i: Int): Future[Int] = {
Future {
println(s"task=$i thread=${Thread.currentThread().getId} time=${System.currentTimeMillis()}")
i * i
}
}
}

当我运行experiment1时,它会打印出来:

task=3 thread=24 time=1541326607613
task=1 thread=22 time=1541326607613
task=2 thread=21 time=1541326607613

experiment2:

task=1 thread=21 time=1541326610653
task=2 thread=20 time=1541326610653
task=3 thread=21 time=1541326610654

观察到差异的原因是什么?我确实知道for的理解和f1.flatMap(r1 => f2.flatMap(r2 => f3.map(r3 => r1 + r2 + r3)))一样下降了,但我仍然忽略了一点,为什么一个可以并行运行,而另一个不能并行运行。

这是Future(…)flatMap的作用:

  • val future = Future(task)开始并行运行任务
  • future.flatMap(result => task)安排在future完成时运行task

请注意,future.flatMap(result => task)无法在future完成之前开始并行运行任务,因为要运行task,我们需要result,只有在future完成时才可用。

现在让我们看看您的example1:

def experiment1: Int = {
// construct three independent tasks and start running them
val f1 = task(1)
val f2 = task(2)
val f3 = task(3)
// construct one complicated task that is ...
val computation =
// ... waiting for f1 and then ...
f1.flatMap(r1 =>
// ... waiting for f2 and then ...
f2.flatMap(r2 =>
// ... waiting for f3 and then ...
f3.map(r3 =>
// ... adding some numbers.
r1 + r2 + r3)))
// now actually trigger all the waiting
Await.result(computation, Duration.Inf)
}

因此,在example1中,由于所有三个任务花费的时间相同,并且都是在同一时间启动的,因此我们可能只需要在等待f1时进行阻塞。当我们四处等待f2时,它的结果应该已经存在了。

example2有何不同?

def experiment2: Int = {
// construct one complicated task that is ...
val computation =
// ... starting task1 and then waiting for it and then ...
task(1).flatMap(r1 =>
// ... starting task2 and then waiting for it and then ...
task(2).flatMap(r2 =>
// ... starting task3 and then waiting for it and then ...
task(3).map(r3 =>
// ... adding some numbers.
r1 + r2 + r3)))
// now actually trigger all the waiting and the starting of tasks
Await.result(computation, Duration.Inf)
}

在本例中,在等待task(1)完成之前,我们甚至没有构建task(2),因此任务无法并行运行。

因此,当使用Scala的Future进行编程时,您必须通过在example1example2这样的代码之间正确选择来控制并发性。或者,您可以研究那些对并发性提供更明确控制的库。

这是因为Scala Futures是严格的。Future内部的操作在Future创建后立即执行,然后将其值存储起来。因此,您正在失去引用透明度。在您的情况下,您的期货是在第一个任务调用中执行的,结果会被存储。它们不会在for内再次执行。在第二种情况下,为了理解而在你身上创造了未来,结果是正确的。

最新更新