我需要启动许多将返回结果的作业。
在主代码(不是不是)中strong>对于给定的超时到期,以先到者为准。
如果我退出等待,因为所有工作都在超时之前完成,那太好了,我会收集他们的结果。
但是,如果某些工作要比超时时间更长,我的主要功能需要在超时到期后立即醒来,请检查哪些作业确实及时完成(如果有),并且仍在运行哪些工作,并从在那里,没有取消仍在运行的作业。
您将如何编码这种等待?
解决方案直接从问题中遵循。首先,我们将为任务设计一个悬挂功能。让我们看看我们的要求:
如果某些工作要花费更长的超时...而不取消仍在运行的工作。
这意味着我们发起的工作必须是独立的(不是孩子),因此我们会选择退出结构化并发,并使用GlobalScope
启动它们,并手动收集所有工作。我们使用async
Coroutine Builder,因为我们计划以后收集某种类型的R
的结果:
val jobs: List<Deferred<R>> = List(numberOfJobs) {
GlobalScope.async { /* our code that produces R */ }
}
启动工作后,我需要等待他们所有人完成任务或给定超时到期,以先到者为准。
让我们等待所有这些,然后等待超时:
withTimeoutOrNull(timeoutMillis) { jobs.joinAll() }
我们使用joinAll
(而不是awaitAll
)来避免例外,如果其中一个作业失败,并且withTimeoutOrNull
避免了超时的例外。
我的主要功能需要在超时到期后立即唤醒,检查哪些作业确实在及时完成(如果有),哪些工作仍在运行
jobs.map { deferred -> /* ... inspect results */ }
在主代码(不是Coroutine)中...
由于我们的主代码不是Coroutine,因此必须以阻止方式等待,因此我们使用runBlocking
编写的代码桥接。将所有内容放在一起:
fun awaitResultsWithTimeoutBlocking(
timeoutMillis: Long,
numberOfJobs: Int
) = runBlocking {
val jobs: List<Deferred<R>> = List(numberOfJobs) {
GlobalScope.async { /* our code that produces R */ }
}
withTimeoutOrNull(timeoutMillis) { jobs.joinAll() }
jobs.map { deferred -> /* ... inspect results */ }
}
P.S。我不建议在任何一种严重的生产环境中部署这种解决方案,因为让您的后台作业(泄漏)在超时后会稍稍咬您。只有当您透彻地理解这种方法的所有缺陷和风险时,才这样做。
您可以尝试使用whileSelect
和onTimeout
子句。但是,您仍然必须克服您的主要代码不是Coroutine的问题。下行是whileSelect
语句的示例。该函数返回一个Deferred
,其中包含在超时期间评估的结果列表,并在未完成的结果的Deferred
s列表中进行了另一个列表。
fun CoroutineScope.runWithTimeout(timeoutMs: Int): Deferred<Pair<List<Int>, List<Deferred<Int>>>> = async {
val deferredList = (1..100).mapTo(mutableListOf()) {
async {
val random = Random.nextInt(0, 100)
delay(random.toLong())
random
}
}
val finished = mutableListOf<Int>()
val endTime = System.currentTimeMillis() + timeoutMs
whileSelect {
var waitTime = endTime - System.currentTimeMillis()
onTimeout(waitTime) {
false
}
deferredList.toList().forEach { deferred ->
deferred.onAwait { random ->
deferredList.remove(deferred)
finished.add(random)
true
}
}
}
finished.toList() to deferredList.toList()
}
在主代码中,您可以使用灰心的方法runBlocking
访问Deferrred
。
fun main() = runBlocking<Unit> {
val deferredResult = runWithTimeout(75)
val (finished, pending) = deferredResult.await()
println("Finished: ${finished.size} vs Pending: ${pending.size}")
}
这是我提出的解决方案。将每个工作与状态配对(除其他信息):
private enum class State { WAIT, DONE, ... }
private data class MyJob(
val job: Deferred<...>,
var state: State = State.WAIT,
...
)
写一个明确的循环:
// wait until either all jobs complete, or a timeout is reached
val waitJob = launch { delay(TIMEOUT_MS) }
while (waitJob.isActive && myJobs.any { it.state == State.WAIT }) {
select<Unit> {
waitJob.onJoin {}
myJobs.filter { it.state == State.WAIT }.forEach {
it.job.onJoin {}
}
}
// mark any finished jobs as DONE to exclude them from the next loop
myJobs.filter { !it.job.isActive }.forEach {
it.state = State.DONE
}
}
初始状态称为等待(而不是运行),因为这不一定意味着该作业仍在运行,只是我的循环尚未考虑到它。
我很想知道这是否足够惯用,还是有更好的方法来编码这种行为。