更新Kotlin中不同线程的结果列表



我想用不同线程的结果更新一个列表。

mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val job = ArrayList<Job>()
val ans = mainScope.async {
var i = 0
for (j in (0..5)) {
job.add(
launch {
val res = async {
func1()
}
x += res.await()
}
)
}
job.joinAll()
}
ans.await()
return x
}
fun func1(): List<A> {
//Perform some operation to get list abc
var abc: List<A> = listOf<A>()
delay(1000)
return abc
}

列表"x"没有得到正确的更新。有时,它会附加"res"…"有时并非如此。有没有一种线程安全的方法来修改这样的列表?

新实现:

mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val ans = mainScope.async {
List(6) {
async{
func1()
}
}.awaitAll()
}
print(ans)
for (item in ans) {
x+= item as List<A> // item is of type Kotlin.Unit
}
}

简短回答

下面是您正在做的事情的一个简单版本,它避免了您可能遇到的同步问题:

suspend fun mainFunction(): List<A> {
return coroutineScope {
List(6) { async { func1() } }.awaitAll()
}
}

如果你想解开这个,你可以阅读长答案。我将解释原始代码中不同的东西,这些东西不是真正习惯的,可以替换。

长回答

问题中的代码中有多个非习惯用法的东西,所以我将尝试逐一解决它们。

为基于0的范围

索引for循环如果您只想重复操作几次,则使用repeat(6)而不是for (j in 0..5)更简单。它更容易阅读,特别是当您不需要索引变量时:

suspend fun mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val job = ArrayList<Job>()
val ans = mainScope.async {
repeat(6) {
job.add(
launch {
val res = async {
func1()
}
x += res.await()
}
)
}
job.joinAll()
}
ans.await()
return x
}

使用循环创建列表

如果你想在循环之外创建一个列表,你也可以使用List(size) { computeElement() }而不是repeat(或for),这利用了列表工厂函数:

suspend fun mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val ans = mainScope.async {
val jobs = List(6) {
launch {
val res = async {
func1()
}
x += res.await()
}
}
jobs.joinAll()
}
ans.await()
return x
}

额外异步

没有必要在这里用额外的async包装你的启动,你可以直接在launch上使用你的作用域:

suspend fun mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val jobs = List(6) {
mainScope.launch {
val res = async {
func1()
}
x += res.await()
}
}
jobs.joinAll()
return x
}

async + immediate await

使用async { someFun() },然后立即使用await-这个Deferred结果相当于直接调用someFun()(除非您使用不同的作用域或上下文,您没有在这里为最内部的逻辑这样做)。

所以你可以替换最里面的部分:

val res = async {
func1()
}
x += res.await()

仅通过x += func1(),它给出:

suspend fun mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val jobs = List(6) {
mainScope.launch {
x += func1()
}
}
jobs.joinAll()
return x
}

launch vs async

如果你想要结果,通常使用async而不是launch更实际。当您使用launch时,您必须手动将结果存储在某个地方(这会使您遇到像现在这样的同步问题)。对于async,你得到一个Deferred<T>的值,然后你可以await(),当你有一个Deferred的列表时,当你等待它们的时候就没有同步问题了。

所以前面代码的一般思想是不好的实践,可能会对您不利,因为它需要手动同步。您可以将其替换为:

suspend fun mainFunction(): List<A> {
val deferredValues = List(6) {
mainScope.async {
func1()
}
}
val x = deferredValues.awaitAll()
return x
}

或简单的:

suspend fun mainFunction(): List<A> {
return List(6) {
mainScope.async {
func1()
}
}.awaitAll()
}

手动连接与coroutineScope

这通常是手动join()作业的气味。如果您想等待一些协程完成,更习惯的做法是在coroutineScope { ... }块中启动所有这些协程,该块将挂起,直到所有子协程完成。

这里我们已经用await调用的async替换了join()调用的所有launch,所以这不再适用了,因为我们仍然需要await()延迟的值来获得结果。然而,由于我们已经在一个挂起函数中,我们仍然可以使用coroutineScope而不是像mainScope这样的外部作用域,以确保我们不会泄漏任何协程:

suspend fun mainFunction(): List<A> {
return coroutineScope {
List(6) { async { func1() } }.awaitAll()
}
}

如果您只想同时执行一些任务并在最后获得所有完成结果的列表,您可以这样做:

val jobs = (0..5).map { mainScope.async { func1() } }
val results = jobs.awaitAll()

最新更新