我得到了以下方法:
operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> = flow {
val jobDomainModelList = mutableListOf<JobDomainModel>()
jobListingRepository.searchJobs(sanitizeSearchQuery(query))
.collect { jobEntityList: List<JobEntity> ->
for (jobEntity in jobEntityList) {
categoriesRepository.getCategoryById(jobEntity.categoryId)
.collect { categoryEntity ->
if (categoryEntity.categoryId == jobEntity.categoryId) {
jobDomainModelList.add(jobEntity.toDomainModel(categoryEntity))
}
}
}
emit(jobDomainModelList)
}
}
它在调用返回Flow<List<JobEntity>>
的search
方法的存储库中进行搜索。然后,对于流中的每个JobEntity
,我需要从DB中获取该作业所属的类别。一旦我有了那个类别和作业,我就可以将作业转换为域模型对象(JobDomainModel
(,并将其添加到列表中,该列表将作为方法的返回对象在流中返回。
我遇到的问题是,从来没有排放过任何东西。我不确定在Kotlin中处理流是否遗漏了什么,但我没有按ID(categoriesRepository.getCategoryById(jobEntity.categoryId)
(获取类别,然后它就可以正常工作,并发出列表。
提前感谢!
我认为问题在于您正在收集无限长的流,因此collect
永远不会返回。在收集之前,您应该使用.take(1)
来获取有限流,或者使用first()
。
DAO返回的流是无限长的。第一个值是进行的第一个查询,但流将永远持续到取消为止。流中的每个项都是在数据库内容更改时进行的新查询。
类似这样的东西:
operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> =
jobListingRepository.searchJobs(sanitizeSearchQuery(query))
.map { jobEntityList: List<JobEntity> ->
jobEntityList.mapNotNull { jobEntity ->
categoriesRepository.getCategoryById(jobEntity.categoryId)
.first()
.takeIf { it.categoryId == jobEntity.categoryId }
}
}
或者,在您的DAO中,您可以制作getCategoryById()
的suspend
函数版本,该版本只返回列表。
如果您的Kotlin协程流因连续近似峰值分配异常而丢失,请从下面的代码中了解
fun test(obj1: Object,obj2: Object) = flow {
emit(if (obj1 != null) repository.postObj(obj1).first() else IgnoreObjResponse)
}.map { Pair(it, repository.postObj(obj2).first()) }