启动许多Coroutines,并与超时(无需取消)加入



我需要启动许多将返回结果的作业。

在主代码(不是不是)中strong>对于给定的超时到期,以先到者为准。

如果我退出等待,因为所有工作都在超时之前完成,那太好了,我会收集他们的结果。

但是,如果某些工作要比超时时间更长,我的主要功能需要在超时到期后立即醒来,请检查哪些作业确实及时完成(如果有),并且仍在运行哪些工作,并从在那里,没有取消仍在运行的作业。

您将如何编码这种等待?

解决方案直接从问题中遵循。首先,我们将为任务设计一个悬挂功能。让我们看看我们的要求:

如果某些工作要花费更长的超时...而不取消仍在运行的工作。

这意味着我们发起的工作必须是独立的(不是孩子),因此我们会选择退出结构化并发,并使用GlobalScope启动它们,并手动收集所有工作。我们使用async Coroutine Builder,因为我们计划以后收集某种类型的R的结果:

val jobs: List<Deferred<R>> = List(numberOfJobs) { 
    GlobalScope.async { /* our code that produces R */ }
}

启动工作后,我需要等待他们所有人完成任务或给定超时到期,以先到者为准。

让我们等待所有这些,然后等待超时:

withTimeoutOrNull(timeoutMillis) { jobs.joinAll() }

我们使用joinAll(而不是awaitAll)来避免例外,如果其中一个作业失败,并且withTimeoutOrNull避免了超时的例外。

我的主要功能需要在超时到期后立即唤醒,检查哪些作业确实在及时完成(如果有),哪些工作仍在运行

jobs.map { deferred -> /* ... inspect results */ }

在主代码(不是Coroutine)中...

由于我们的主代码不是Coroutine,因此必须以阻止方式等待,因此我们使用runBlocking编写的代码桥接。将所有内容放在一起:

fun awaitResultsWithTimeoutBlocking(
    timeoutMillis: Long,
    numberOfJobs: Int
) = runBlocking {
    val jobs: List<Deferred<R>> = List(numberOfJobs) { 
        GlobalScope.async { /* our code that produces R */ }
    }    
    withTimeoutOrNull(timeoutMillis) { jobs.joinAll() }
    jobs.map { deferred -> /* ... inspect results */ }
}

P.S。我不建议在任何一种严重的生产环境中部署这种解决方案,因为让您的后台作业(泄漏)在超时后会稍稍咬您。只有当您透彻地理解这种方法的所有缺陷和风险时,才这样做。

您可以尝试使用whileSelectonTimeout子句。但是,您仍然必须克服您的主要代码不是Coroutine的问题。下行是whileSelect语句的示例。该函数返回一个Deferred,其中包含在超时期间评估的结果列表,并在未完成的结果的Deferred s列表中进行了另一个列表。

fun CoroutineScope.runWithTimeout(timeoutMs: Int): Deferred<Pair<List<Int>, List<Deferred<Int>>>> = async {
    val deferredList = (1..100).mapTo(mutableListOf()) {
        async {
            val random = Random.nextInt(0, 100)
            delay(random.toLong())
            random
        }
    }
    val finished = mutableListOf<Int>()
    val endTime = System.currentTimeMillis() + timeoutMs
    whileSelect {
        var waitTime = endTime - System.currentTimeMillis()
        onTimeout(waitTime) {
            false
        }
        deferredList.toList().forEach { deferred ->
            deferred.onAwait { random ->
                deferredList.remove(deferred)
                finished.add(random)
                true
            }
        }
    }
    finished.toList() to deferredList.toList()
}

在主代码中,您可以使用灰心的方法runBlocking访问Deferrred

fun main() = runBlocking<Unit> {
    val deferredResult = runWithTimeout(75)
    val (finished, pending) = deferredResult.await()
    println("Finished: ${finished.size} vs Pending: ${pending.size}")
}

这是我提出的解决方案。将每个工作与状态配对(除其他信息):

private enum class State { WAIT, DONE, ... }
private data class MyJob(
    val job: Deferred<...>,
    var state: State = State.WAIT,
    ...
)

写一个明确的循环:

// wait until either all jobs complete, or a timeout is reached
val waitJob = launch { delay(TIMEOUT_MS) }
while (waitJob.isActive && myJobs.any { it.state == State.WAIT }) {
    select<Unit> {
        waitJob.onJoin {}
        myJobs.filter { it.state == State.WAIT }.forEach { 
            it.job.onJoin {}
        }
    }
    // mark any finished jobs as DONE to exclude them from the next loop
    myJobs.filter { !it.job.isActive }.forEach { 
        it.state = State.DONE
    }
}

初始状态称为等待(而不是运行),因为这不一定意味着该作业仍在运行,只是我的循环尚未考虑到它。

我很想知道这是否足够惯用,还是有更好的方法来编码这种行为。

相关内容

  • 没有找到相关文章

最新更新