当其中一个流发出特定值时,如何取消流的组合



我正在并行处理多个网络请求,并使用Stateflow监视结果。每个网络请求都在一个单独的flow中完成,我使用combine在我的Stateflow上推送最新状态。这是我的代码:

回购类别:

fun networkRequest1(id: Int): Flow<Resource<List<Area>>> =
flow {
emit(Resource.Loading())
try {
val areas = retrofitInterface.getAreas(id)
emit(Resource.Success(areas))
} catch (throwable: Throwable) {
emit(
Resource.Error()
)
)
}
}
fun networkRequest2(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest3(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest4(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity

ViewModel类:

val getDataCombinedStateFlow: StateFlow<Resource<HashMap<String, Resource<out List<Any>>>>?> =
getDataTrigger.flatMapLatest {
withContext(it) { 
combine(
repo.networkRequest1(id: Int),
repo.networkRequest2(id: Int),
repo.networkRequest3(id: Int),
repo.networkRequest4(id: Int)
) { a,
b,
c,
d
->
hashMapOf(
Pair("1", a),
Pair("2",b),
Pair("3", c),
Pair("4", d),
)
}.flatMapLatest {
val progress = it
var isLoading = false
flow<Resource<HashMap<String, Resource<out List<Any>>>>?> {
emit(Resource.Loading())
progress.forEach { (t, u) ->
if (u is Resource.Error) {
emit(Resource.Error(error = u.error!!))
// I want to cancel here, as I no longer care if 1 request fails
return@flow
}
if (u is Resource.Loading) {
isLoading = true
}
}
if (isLoading) {
emit(Resource.Loading())
return@flow
}
if (!isLoading) {
emit(Resource.Success(progress))
}
}
}
}
}.stateIn(viewModelScope, SharingStarted.Lazily, null)

视图类:

viewLifecycleOwner.lifecycleScope.launchWhenCreated() {
viewModel.getDataCombinedStateFlow.collect {
val result = it ?: return@collect
binding.loadingErrorState.apply {
if (result is Resource.Loading) {
//show smth
}
if (result is Resource.Error) {
//show error msg
}
if (result is Resource.Success) {
//done
}
}
}
}

我希望能够在Resource.Error发出后取消所有工作,因为在其他API调用之一失败的情况下,我不再需要等待或执行任何相关工作来响应它们。

我怎样才能做到这一点?

我试图取消收集,但构建Stateflow的流继续工作,并产生emmit结果。我知道它们不会被收集,但我仍然觉得这是浪费资源。

我认为整个情况很复杂,因为源流只是在原本具有Loading状态的挂起函数之前。因此,您必须将它们合并并过滤出各种加载状态,最终结果流会不断重复发出加载状态,直到所有源都准备好为止。

如果您的网络操作具有基本的挂起功能,例如:

suspend fun networkRequest1(id: Int): List<Area> =
retrofitInterface.getAreas(id)

然后,视图模型流变得更简单。仅仅使用特定上下文来调用flow构建器函数是没有意义的,所以我省略了这一部分。(我也很困惑为什么你有一个CoroutineContext流。(

我还认为,如果将请求调用分解为一个单独的函数,会干净得多。

private fun makeParallelRequests(id: Int): Map<String, Resource<out List<Any>> = coroutineScope {
val results = listOf(
async { networkRequest1(id) },
async { networkRequest2(id) },
async { networkRequest2(id) },
async { networkRequest4(id) }
).awaitAll()
.map { Resource.Success(it) }
listOf("1", "2", "3", "4").zip(results).toMap()
}
val dataCombinedStateFlow: StateFlow<Resource<Map<String, Resource<out List<Any>>>>?> =
getDataTrigger.flatMapLatest {
flow {
emit(Resource.Loading())
try {
val result = makeParallelRequests(id)
emit(Resource.Success(result))
catch (e: Throwable) {
emit(Resource.Error(e))
}
}
}

我同意@Tenfour04的观点,即那些嵌套流过于复杂,有几种方法可以简化这一点(@Tenfour04的解决方案很好(。

如果你不想重写所有内容,那么你可以修复破坏结构化并发的那一行:

.stateIn(viewModelScope, SharingStarted.Lazily, null)

这样,整个ViewModel流在ViewModel的作用域中启动,而视图则从一个单独的作用域(viewLifecycleOwner.lifecycleScope,即Fragment/Activity作用域(启动收集。

如果要从视图中取消流,则需要使用相同的作用域或公开一个取消ViewModel作用域的取消函数。

如果您想取消ViewModel本身的流(在return@flow语句中(,那么您可以简单地添加:

viewModelScope.cancel()

相关内容

  • 没有找到相关文章

最新更新