并行协程执行超时,并在结束时合并结果



我正试图用协程实现一种琐碎的用例:发送并行请求,然后等待所有请求返回并将结果合并到一个列表中。我使用下面的逻辑,但不知何故,它并没有等待所有响应,而是在第一个响应完成后完成(转到flatn(((。我做错了什么?

fun run() {
GlobalScope.launch  {
running = true
results =
providers
.map { provider -> async { provider.retrieve() } }
.map { retrieval ->
try {
withTimeout(2000L) {
retrieval.await()
}
} catch (ex: CancellationException) {
arrayListOf<Pair<String, String>>()
}
}
.flatten()
running = false
notifyObservers()
}
}

我在您的代码中看不到任何错误。不管怎样,让我们看看我能做些什么,也许这会有所帮助。

我假设您的提供者retrieve()返回某种类型的T的列表

让我们创建一个类似的Provider类,该类具有一个返回Int:列表的挂起函数

class Provider(val name: String) {
suspend fun execute(): List<Int>
}

然后让我们创建一个3个提供商的列表:

val providers: List<Provider> = listOf(Provider("p1"), Provider("p2"), Provider("p3"))

使用列表上的map()函数,我们使用async()函数将它们封装在Deferred中:

val deferredList: List<Deferred<Int>> = providers.map { provider ->
async { provider.execute() }
}

执行延迟

现在我们有两个选项,要么我们执行另一个map操作,然后在每个Deferred:上调用await()

val result: List<List<Int>> = deferredList.map { it.await } }

或者我们使用扩展函数awaitAll((,并将实际结果作为整数列表:

val result: List<List<Int>> = deferredList.awaitAll()

然后我们可以使用flatten()来压平结果

把所有东西放在一起

让我们创建一个函数,在完成所有挂起调用后,该函数获取提供程序列表并返回Int列表。

suspend fun executeAllProvidersConcurrently(providers: List<Provider>): List<Int> = withContext(Dispatchers.Default){
return@withContext providers.map {
async { it.execute() }
}.awaitAll().flatten()
}
launch {
println(executeAllProvidersConcurrently(providers))
}

正如你所看到的,我做的并没有太大的不同。我创建了一个Gist,您可以在其中获得完整的示例代码并自己运行。

希望这将帮助您同时运行Coroutines并获取所有结果。

最新更新