Rxjava 和工作管理器链接了异步调用



>更新:这就是我的旧 insertIntoDb 方法的样子,它不起作用:

private Completable insertIntoDb(List<ArticleEntity> articleItemEntities) {
return database.articleDao().insertArticles(articleItemEntities)
.observeOn(Schedulers.io());
}

我将其更改为以下内容,现在它可以工作:

private void insertIntoDbNew(List<ArticleEntity> articleItemEntities) {
mCompositeDisposable.add(
database.articleDao().insertArticles(articleItemEntities)
.subscribeOn(Schedulers.io())
.subscribe());
}

我不知道为什么,但现在它可以工作了。确保工作线程在数据库插入完成之前完成,但这似乎不是我以前认为的问题。

更新结束。

我是响应式编程的新手。我的目标是安排一个工作管理器执行 4 个操作,然后使用 RxJava2 返回结果。以下是我要执行的任务

  1. 执行网络 API 调用。
  2. 构建我们从 API 调用中获得的数据。
  3. 将其插入我们的本地房间数据库。
  4. 当一切都完成后,Result.success()信号返回到作业,以便它知道一切正常并且可以终止。

所以我的首选方法看起来像这样。

public Result doWork(){
return api.get("URL") responseData -> structureData(responseData) structuredData -> insertIntoDB(structuredData) -> Result.success()
}

我正在使用RxJava2和RxWorker class。

以下是我当前的解决方案。这是正确的还是我做错了什么?

public class DownloadWorker extends RxWorker {
@Override
public Single<Result> createWork() {
return apiService.download("URL")
.map(response -> processResponse(response))
.doOnSuccess(data -> insertIntoDb(data))
.flatMap(response ->
allComplete()
)
.observeOn(Schedulers.io());
}
Single<Result> allComplete() {
return Single.just(Result.success());
}
}

它的行为就像我想要的那样。它下载数据,构建数据,然后将其插入数据库,然后返回Result.success()。但我不知道我在做什么。我是否按预期使用 RxJava?

这部分也困扰着我:

.flatMap(response -> allComplete())

响应部分是多余的,我可以以某种方式删除它吗?

我对你的代码做了一些改进:

public class DownloadWorker extends RxWorker {
@Override
public Single<Result> createWork() {
return apiService.download("URL")
.map(response -> processResponse(response))
.flatMapCompletable(articleItemEntities -> database.articleDao().insertArticles(articleItemEntities))
.toSingleDefault(Result.success())
.onErrorReturnItem(Result.failure())
.observeOn(Schedulers.io());
}
}

在原始代码中,您可以使用doOnSuccess方法保存数据,这是一种副作用。正如您在评论中提到的,insertIntoDb()方法返回Completable.因此,我将doOnSuccess(data -> insertIntoDb(data))更改为flatMapCompletable(data -> insertIntoDb(data)),这将允许您确保存储数据成功并等待它完成。由于insertIntoDb()方法返回Completable并且createWork()方法必须返回Result,我们现在必须将类型从Completable更改为Single<Result>。因此,我使用了默认返回Result.success()toSingleDefault。此外,我还添加了onErrorReturnItem(Result.failure()),这将允许RxWorker跟踪错误。我希望我的回答有所帮助。

最新更新