Kotlin flatMapMerge在多次合并相同流时不一致



我有一个应用程序,flatMapMerge是一个共享的流程a来创建流程B,然后我将流程a与流程B结合起来得到最终结果,我collectLatest。有时它输出所有的结果。但是,在其他时候,它会跳过一些输出。下面是一个简单的例子:

val flowA = MutableSharedFlow<Int>(replay = Int.MAX_VALUE)
val flowB = flowA.flatMapMerge {
// In the real app, this flow is created in a complex way
flow {
emit(it + 2)
emit(it + 3)
emit(it + 4)
emit(it + 5)
emit(it + 6)
emit(it + 7)
emit(it + 8)
emit(it + 9)
}
}
flowA.emit(1)
flowA.combine(flowB, ::Pair).collect { (a, b) ->
// I need access to the value of flowA for calculations here
Log.d(TAG, "result A: $a, B: $b")
}

运行1:

result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 9
result A: 1, B: 10

运行2:

result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 10
预期输出:

result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10

我也试过调整重播,但没有效果。我也尝试了collectLatest代替collect,但也没有变化。

我应该使用flatMapMerge以外的东西吗?我尝试了flatMapLatest,但这只输出第一个和最后一个结果。MutableSharedFlow应该是别的什么吗?

我如何得到所有的结果输出每次?

这里的问题是使用组合函数

作为官方文档,合并功能:

返回一个流,它的值是由转换函数生成的组合最近发出的

如果combine()的变换函数内部有一些繁重的计算,则有可能combine()不输出所有结果。

因此,对于获得所有结果,下面的策略工作良好。

val flowA = MutableSharedFlow<Int>()
.onStart { emit(1) }
val flowB = flowA.flatMapMerge {
flow {
for (i in 2..9) {
emit(Pair(it, i))
}
}
}
runBlocking {
flowB.collect { (a, b) ->
println("result A: $a, B: $b")
}
}

输出:

result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10

最新更新