我有一个应用程序,flatMapMerge
是一个共享的流程a来创建流程B,然后我将流程a与流程B结合起来得到最终结果,我collectLatest
。有时它输出所有的结果。但是,在其他时候,它会跳过一些输出。下面是一个简单的例子:
val flowA = MutableSharedFlow<Int>(replay = Int.MAX_VALUE)
val flowB = flowA.flatMapMerge {
// In the real app, this flow is created in a complex way
flow {
emit(it + 2)
emit(it + 3)
emit(it + 4)
emit(it + 5)
emit(it + 6)
emit(it + 7)
emit(it + 8)
emit(it + 9)
}
}
flowA.emit(1)
flowA.combine(flowB, ::Pair).collect { (a, b) ->
// I need access to the value of flowA for calculations here
Log.d(TAG, "result A: $a, B: $b")
}
运行1:
result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 9
result A: 1, B: 10
运行2:
result A: 1, B: 3
result A: 1, B: 5
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 10
预期输出:result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10
我也试过调整重播,但没有效果。我也尝试了collectLatest
代替collect
,但也没有变化。
我应该使用flatMapMerge
以外的东西吗?我尝试了flatMapLatest
,但这只输出第一个和最后一个结果。MutableSharedFlow
应该是别的什么吗?
我如何得到所有的结果输出每次?
这里的问题是使用组合函数
作为官方文档,合并功能:
返回一个流,它的值是由转换函数生成的组合最近发出的
如果combine()
的变换函数内部有一些繁重的计算,则有可能combine()不输出所有结果。
因此,对于获得所有结果,下面的策略工作良好。
val flowA = MutableSharedFlow<Int>()
.onStart { emit(1) }
val flowB = flowA.flatMapMerge {
flow {
for (i in 2..9) {
emit(Pair(it, i))
}
}
}
runBlocking {
flowB.collect { (a, b) ->
println("result A: $a, B: $b")
}
}
输出:
result A: 1, B: 3
result A: 1, B: 4
result A: 1, B: 5
result A: 1, B: 6
result A: 1, B: 7
result A: 1, B: 8
result A: 1, B: 9
result A: 1, B: 10