Kotlin流量组合



当有一系列并行请求需要收敛到相同的结果时,我不知道如何使用Kotlin流。举个例子,让我们使用这样一个场景,其中我们有两个端点,一个返回userID列表,另一个返回用户是否超过30岁。

fun callEndpoint1(): Flow<List<String>> { 
// Actual call to server returning a list of N userIDs
// For simulation purposes it returns the following
return flowOf(listOf("1", "2", "3", "4", "5"))
}
fun callEndpoint2(userId: String): Flow<Boolean> { 
// Actual call to server returning a boolean
// For simulation purposes it returns the following
return flowOf(userId.toInt() % 2 == 0)
}
fun calculateTotalPeopleOver30(): Flow<Int> {
return callEndpoint1().map{ listIds ->
// wrong approach returning 0, should return 2
var result = 0
listIds.forEach{ id ->
callEndpoint2(id).map { isOver30 ->
if (isOver30) result++
}
}
result
}
}

这种方法是错误的,因为变量result将在有机会存储来自不同并行调用的所有结果之前返回。如果没有机会让端点能够一起处理大量ID,那么解决问题的正确方法是什么?

我找到了一种让它发挥作用的方法,但这只是通过在另一个课堂上泄露我们需要实现的知识,而这不是我想要的。只是为了说明的目的,我把它包括在这里

fun calculateTotalPeopleOver30(): Flow<List<Boolean>> {
return callEndpoint1().map { listIds ->
val result = arrayListOf<Boolean>()
listIds.forEach { id ->
result.add(callEndpoint2(id).first())
}
result.toList()
}
}
fun coroutineLauncher(scope: CoroutineScope) {
scope.launch{
calculateTotalPeopleOver30()
.collect { list ->
println("people over 30 are ${list.filter{it}.size}")
}
}
}

您的示例不起作用,因为Flow在您收集它之前不会做任何事情。这是一个懒惰的构造。这意味着这段代码实际上什么都不做:

listIds.forEach{ id ->
callEndpoint2(id).map { isOver30 ->
if (isOver30) result++
}
}

要解决此问题,您需要收集callEndpoint2(id)返回的Flow。您可以使用count-方法来执行此操作,而不是手动计数。

val result = listIds.map { id ->
callEndpoint2(id)
.count { it }
}.sum()

现在,这还有另一个问题。计数将不会并行执行。它将完成对一个callEndpoint2(id)的结果的计数,然后继续下一个。

为此,您可以将ID列表转换为Flow,并使用flatMapMerge同时调用callEndpoint2flatMapMerge的默认并发性是16,但您可以使用concurrency-参数对此进行配置。生成的代码是:

fun calculateTotal(): Flow<Int> {
return callEndpoint1()
.map { 
it.asFlow()
.flatMapMerge { callEndpoint2(it) }
.count { it }
}
}

最新更新