我正在将我们当前的应用程序从 Java 翻译成 Kotlin,我遇到了这个问题。
用于使用线程从服务器传输数据的 java 实现。
它将创建大约 100 个不同的线程来请求数据,但从我所看到的情况来看,一次运行不超过 4 个,其他线程将在启动之前等待线程完成。
在将其转换为 Kotlin 时,我使用了协程
这会产生一个问题,因为显然服务器无法处理实际发送的 100 个请求。
所有协程都在同一范围内启动,所以它是这样的:
//this is a custom scope that launches on Dispatchers.IO + a job that I can use to cancel everything
transferScope.launch {
//loadData is a suspending function that returns true/false
val futures = mDownloadJobs.map{ async { it.loadData() } }
val responses = futures.awaitAll()
//check that everything in responses is true etc....
}
有没有办法使特定的 transferScope 一次最多只允许 5 个协程,然后在一个完成后让另一个协程运行?(我不在乎订单(
如果无法通过范围完成,是否有其他方法可以实现?
要求每个协程在提出请求之前从总共 5 个许可证中获取 KotlinSemaphore
许可证。
像这样:
import kotlinx.coroutines.sync.Semaphore
val requestSemaphore = Semaphore(5)
val futures = mDownloadJobs.map {
async {
// Will limit number of concurrent requests to 5
requestSemaphore.withPermit {
it.loadData()
}
}
}
val responses = futures.awaitAll()
可能的解决方案可能是
withContext(Dispatchers.Default.limitedParallelism(5)) {
#async invocation here
}
您可以执行类似操作,将请求分组为 4 个块,启动协程来处理它们,然后等到该组完成再启动新请求。
requests.chunked(4).forEachIndexed { index, chunk ->
coroutineScope {
LOG("processing chunk $index")
chunk.forEach {
launch {
delay(100)
}
}
LOG("done processing $index")
}
}
我相信你应该引导并限制你正在创建的协程的创建。
val channel = Channel<Job>()
transferScope.launch {
mDownloadJobs.forEach { channel.send(it) }
channel.close() // don't forget to close the channel
}
coroutineScope {
val responses = mutableListOf<Any>()
repeat(5).map {
launch {
for (job in mDownloadJobsChannel) {
responses.add(jobs.loadData())
}
}
}
}
在这种情况下,并行化是 5 个协程。
我没有测试这段代码:D我相信有更清洁的方法可以做到这一点。
>Dispatchers.IO
声称要创建一个线程池并将跨度限制在该池中。它的文档字符串告诉您如何更改池大小(系统属性(。
5 个工人示例
val semaphore = Semaphore(5)
coroutineScope {
list.map {
async {
semaphore.acquire()
// logic(it)
semaphore.release()
}
}.awaitAll()
}