如何在Android Worker中完成Kotlin Flow



我正在调查Kotlin Flow在我当前的Android应用程序中的使用情况

我的应用程序通过改装API调用从远程服务器检索数据。

其中一些API在500个项目页面中返回50000个数据项目。

每个API响应都包含一个HTTP链接头,其中包含"下一页"完整URL。

这些呼叫最多需要2秒钟才能完成。

为了减少所花费的时间,我采用了Kotlin Flow来同时处理每一页同时还进行下一页API调用。

我的流程定义如下:

private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val internalWorkWorkState = MutableStateFlow<Response<List<MyPage>>?>(null)
private val workWorkState = internalWorkWorkState.asStateFlow()
private val myJob: Job
init {
myJob = GlobalScope.launch(persistenceThreadPool) {
workWorkState.collect { page ->
if (page == null) {
} else managePage(page!!)
}
}
}

My Recursive函数定义如下,用于获取所有页面:-

private suspend fun managePages(accessToken: String, response: Response<List<MyPage>>) {
when {
result != null -> return
response.isSuccessful -> internalWorkWorkState.emit(response)
else -> {
manageError(response.errorBody())
result = Result.failure()
return
}
}
response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
if (parts.size >= 2) {
managePages(accessToken, service.myApiCall(accessToken, parts[1]))
}
}
}
private suspend fun managePage(response: Response<List<MyPage>>) {
val pages = response.body()
pages?.let {
persistResponse(it)
}
}
private suspend fun persistResponse(myPage: List<MyPage>) {
val myPageDOs = ArrayList<MyPageDO>()
myPage.forEach { page ->
myPageDOs.add(page.mapDO())
}
database.myPageDAO().insertAsync(myPageDOs)
}

我的许多问题都是

  1. 此代码不会插入我检索的所有数据项

  2. 当检索到所有数据项时,如何完成流程

  3. 检索并持久化所有数据项后,我如何完成GlobalScope作业

更新

通过进行以下更改,我已成功插入所有数据

private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val completed = CompletableDeferred<Int>()
private val channel = Channel<Response<List<MyPage>>?>(UNLIMITED)
private val channelFlow = channel.consumeAsFlow().flowOn(persistenceThreadPool)
private val frank: Job
init {
frank = GlobalScope.launch(persistenceThreadPool) {
channelFlow.collect { page ->
if (page == null) {
completed.complete(totalItems)
} else managePage(page!!)
}
}
}

...
...
...
channel.send(null)
completed.await()
return result ?: Result.success(outputData)

我不喜欢依赖CompletableDeferred,有比这更好的方法来知道Flow何时完成了所有事情吗?

您正在寻找流生成器和flow.buffer((:

suspend fun getData(): Flow<Data> = flow {
var pageData: List<Data>
var pageUrl: String? = "bla"
while (pageUrl != null) {
TODO("fetch pageData from pageUrl and change pageUrl to the next page")
emitAll(pageData)
}
}
.flowOn(Dispatchers.IO /* no need for a thread pool executor, IO does it automatically */)
.buffer(3)

你可以像普通的Flow一样使用它,迭代等等。如果你想知道输出的总长度,你应该用一个可变的闭包变量在使用者身上计算它。请注意,您不应该在任何地方(最好是任何时候(使用GlobalScope。

有几种方法可以实现所需的行为。我建议使用专门为并行分解设计的协程scope。它还提供了良好的取消和开箱即用的错误处理行为。结合Channel.close行为,它使实现变得非常简单。从概念上讲,实现可能是这样的:

suspend fun fetchAllPages() {
coroutineScope {
val channel = Channel<MyPage>(Channel.UNLIMITED)
launch(Dispatchers.IO){ loadData(channel) }
launch(Dispatchers.IO){ processData(channel) }
}
}
suspend fun loadData(sendChannel: SendChannel<MyPage>){
while(hasMoreData()){
sendChannel.send(loadPage())
}
sendChannel.close()
}
suspend fun processData(channel: ReceiveChannel<MyPage>){
for(page in channel){
// process page
}
}

它的工作方式如下:

  1. coroutineScope将挂起,直到所有子项都完成为止。所以你不再需要CompletableDeferred
  2. loadData()在循环中加载页面并将它们发布到通道中。它会在加载完所有页面后立即关闭通道
  3. processData从通道中逐个取出项目并对其进行处理。一旦处理完所有项目(并且通道已关闭(,循环就会结束

在这个实现中,生产者协同程序独立工作,没有背压,因此如果处理缓慢,可能会占用大量内存。限制缓冲区容量,使生产者协同程序在缓冲区已满时挂起。使用通道扇出行为来启动多个处理器以加快计算速度可能也是一个好主意。

最新更新