RxJava 组合了对象列表的数据库创建/更新调用



我目前正在与我的第一个更复杂的 rx 链作斗争,也许你可以帮助我。 我有一个对象列表,我从 api 调用中获得。我需要检查它们是否存在于本地,如果存在,请更新数据集,如果不创建它们。我目前的方法如下:

private fun insertUpdateFromServer(objectsToInsert: List<Model>): Completable {
return Observable.fromIterable(objectsToInsert).flatMapCompletable { atl ->
dao.getByServerId(atl.id).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.switchIfEmpty { obs: SingleObserver<in Model> -> createNewObject(atl) }.flatMapCompletable {
updateObject(atl, it).applySchedulers()
}
}

重要的是,我必须等待所有子任务完成。如果我只有要更新的 objcts,这是有效的,但如果有一个新对象,整个事情将无法完成。

请注意,我只对操作完成感兴趣,发出的对象对我来说无关紧要。函数 "getByServerId" 返回一个 Maybe。

所以我问你是否可以指出我的逻辑错误并将我推向正确的方向,提前感谢!

问题出在以下行上:

switchIfEmpty { obs: SingleObserver<in Model> -> createNewObject(atl) }

Kotlin 的自动 SAM 转换试图通过生成实现SingleSourcevoid subscribeswitchIfEmpty重载来使您的生活更轻松。但是,并非在所有情况下这都有帮助,而是您想要使用的是:

switchIfEmpty(createNewObject(atl))

可以在此处找到有关为什么会发生这种情况的更好解释。

所以我最终得到了一个解决方法,我在这里找到了。

TL;DR:也许会发出 null 并调用 "onComplete"。因此,如果我使用"onSuccess"和"onComplete",我可以将其链接起来。

感兴趣的人的代码:

private fun insertUpdateFromServer(objects: List<Model>): Completable {
return Observable.fromIterable(objects)
.flatMapCompletable { obj ->
dao.getByServerId(obj.id).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.doOnComplete { createNewObject(obj).subscribe() }
.doOnSuccess { updateObject(obj, it).applySchedulers().subscribe() }
.flatMapCompletable { Completable.complete() }
}
}

此表达式等待所有子任务终止,然后完全终止。

最新更新