限制可在作用域中运行的最大协程数



我正在将我们当前的应用程序从 Java 翻译成 Kotlin,我遇到了这个问题。

用于使用线程从服务器传输数据的 java 实现。

它将创建大约 100 个不同的线程来请求数据,但从我所看到的情况来看,一次运行不超过 4 个,其他线程将在启动之前等待线程完成。

在将其转换为 Kotlin 时,我使用了协程

这会产生一个问题,因为显然服务器无法处理实际发送的 100 个请求。

所有协程都在同一范围内启动,所以它是这样的:

//this is a custom scope that launches on Dispatchers.IO + a job that I can use to cancel everything
transferScope.launch {
//loadData is a suspending function that returns true/false 
val futures = mDownloadJobs.map{ async { it.loadData() } }
val responses = futures.awaitAll()
//check that everything in responses is true etc....
}

有没有办法使特定的 transferScope 一次最多只允许 5 个协程,然后在一个完成后让另一个协程运行?(我不在乎订单(

如果无法通过范围完成,是否有其他方法可以实现?

要求每个协程在提出请求之前从总共 5 个许可证中获取 KotlinSemaphore许可证。

像这样:

import kotlinx.coroutines.sync.Semaphore
val requestSemaphore = Semaphore(5)
val futures = mDownloadJobs.map {
async {
// Will limit number of concurrent requests to 5
requestSemaphore.withPermit {
it.loadData()
}
}
}
val responses = futures.awaitAll()

可能的解决方案可能是

withContext(Dispatchers.Default.limitedParallelism(5)) {
#async invocation here
}

您可以执行类似操作,将请求分组为 4 个块,启动协程来处理它们,然后等到该组完成再启动新请求。

requests.chunked(4).forEachIndexed { index, chunk ->
coroutineScope {
LOG("processing chunk $index")
chunk.forEach {
launch {
delay(100)
}
}
LOG("done processing $index")
}
}

我相信你应该引导并限制你正在创建的协程的创建。

val channel = Channel<Job>()
transferScope.launch {
mDownloadJobs.forEach { channel.send(it) }
channel.close() // don't forget to close the channel
}
coroutineScope {
val responses = mutableListOf<Any>()
repeat(5).map { 
launch {
for (job in mDownloadJobsChannel) {
responses.add(jobs.loadData())
}
}
}
}

在这种情况下,并行化是 5 个协程。

我没有测试这段代码:D我相信有更清洁的方法可以做到这一点。

>Dispatchers.IO声称要创建一个线程池并将跨度限制在该池中。它的文档字符串告诉您如何更改池大小(系统属性(。

5 个工人示例

val semaphore = Semaphore(5)
coroutineScope {
list.map {
async {
semaphore.acquire()

// logic(it)

semaphore.release()
}
}.awaitAll()
}

最新更新