limitedParallelism与固定线程池调度程序的区别是什么?



我试图使用Kotlin协程并发地执行多个HTTP调用,而不是一次执行一个,但我想避免并发地进行所有调用,以避免外部API的速率限制。

如果我只是为每个请求启动一个协程,它们几乎都是立即发送的。因此,我研究了limitedParallelism函数,它听起来非常接近我需要的,一些堆栈溢出的答案建议是推荐的解决方案。对于相同问题的旧答案建议使用newFixedThreadPoolContext。

该函数的文档提到,如果不需要单独的线程池,limitedParallelism是首选的替代方案;

如果您不需要单独的线程池,但必须限制调度程序的有效并行性,则建议使用CoroutineDispatcher。limitedParallelism。

然而,当我写代码使用limitedParallelism时,与newFixedThreadPoolContext相比,它并没有减少并发调用的数量。

在下面的示例中,我将网络调用替换为Thread.sleep,这不会改变行为。


// method 1
val fixedThreadPoolContext = newFixedThreadPoolContext(2)
// method 2
val limitedParallelismContext = Dispatchers.IO.limitedParallelism(2)
runBlocking {
val jobs = (1..1000).map {
// swap out the dispatcher here
launch(limitedParallelismContext) {
println("started $it")
Thread.sleep(1000)
println("    finished $it")
}
}
jobs.joinAll()
}

fixedThreadPoolContext的行为与预期一致,一次运行的协程不超过2个,完成的总时间是几分钟(每次1000次1秒,每次除以2,大约500秒)。

然而,对于limitedParallelismContext,所有的"开始#"行立即打印,一秒钟后,所有"完成"。打印行,程序在1秒内完成。

为什么limitedParallelism与使用单独的线程池没有相同的效果?完成了什么?

我稍微修改了您的代码,以便每个协程需要200ms才能完成,并在完成时打印时间。然后我把它粘贴到play。kotlinlang。org来检查:

/**
* You can edit, run, and share this code.
* play.kotlinlang.org
*/
import kotlinx.coroutines.*
fun main() {
// method 1
val fixedThreadPoolContext = newFixedThreadPoolContext(2, "Pool")
// method 2
val limitedParallelismContext = Dispatchers.IO.limitedParallelism(2)
runBlocking {
val jobs = (1..10).map {
// swap out the dispatcher here
launch(limitedParallelismContext) {
println("it at ${System.currentTimeMillis()}")
Thread.sleep(200)
}
}
jobs.joinAll()
}
}

在那里使用kotlin 1.6.21的结果是预期的:

it at 1652887163155
it at 1652887163157
it at 1652887163358
it at 1652887163358
it at 1652887163559
it at 1652887163559
it at 1652887163759
it at 1652887163759
it at 1652887163959
it at 1652887163959

一次只能执行2个协程。

您也可以这样做,以保持循环同步并批处理调用。操场上的例子:

import kotlinx.coroutines.*
fun main() {
runBlocking {
val start = System.currentTimeMillis()
fun t(): Long = System.currentTimeMillis() - start
(1..50).chunked(2).forEach { ids ->
println("launching $ids at ${t()}")
ids.map {
launch(Dispatchers.IO) {
delay(100)
println("finished $it at ${t()}")
}
}.joinAll()
}
}
}

打印:

launching [1, 2] at 24
finished 2 at 161
finished 1 at 161
launching [3, 4] at 163
finished 3 at 264
finished 4 at 264
launching [5, 6] at 264
finished 5 at 365
finished 6 at 365
launching [7, 8] at 365
finished 7 at 465
finished 8 at 465
launching [9, 10] at 466
finished 9 at 566
finished 10 at 566

相关内容

最新更新