RXJAVA2:如何改善并行数据下载和缓存



我正在通过rxjava2战斗。我想知道我的解决方案是可以接受的还是有任何改进方法。

Uustecase

  1. 用户按更新数据按钮
  2. 显示了一个对话框 - 请等待
  3. 并行处理几个后端呼叫
  4. 这些完成中的任何一个 - 数据都保存在本地数据库中
  5. 所有请求完成(后端呼叫和持久)后,对话框应驳回

当前解决方案

我有几个Completables看起来像这样:

Completable organisationUnitCompletable = backendService.getOrganisationUnits()
    .doOnNext(data -> organisationUnitDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());
Completable locationCompletable = backendService.getLocations()
    .doOnNext(data -> locationDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());
Completable prioritiesCompletable = backendService.getPriorities()
    .doOnNext(data -> priorityDao.saveInTx(data))
    .ignoreElements()
    .subscribeOn(Schedulers.io());

我通过添加到列表和使用merge操作员来将它们打包到一个。

List<Completable> compatibles = new ArrayList<>();
compatibles.add(organisationUnitCompletable);
compatibles.add(locationCompletable);
compatibles.add(prioritiesCompletable);
Completable.merge(compatibles)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() -> {
     progressDialog.dismiss();
});

可能的改进

好的,这是按预期工作的。但是有些事情我不太高兴。

我真的必须将subscribeOn(Schedulers.io())添加到每个可完整的吗?没有它,它就无法并行起作用,但是也许有更好的方法可以做到这一点?

所有可完整的行都有这些行。

    .ignoreElements()
    .subscribeOn(Schedulers.io());

有没有办法将其提取为一种方法?我尝试了这样的事情:

private <T> Completable prepareCompletable(Function<Void, Observable<List<T>>> source, AbstractDao<T, Long> dao) {
    Completable orderTypeCompletable = source
            .doOnNext(data -> dao.saveInTx(data))
            .ignoreElements()
            .subscribeOn(Schedulers.io());
}

我只将可观察到的dao放入哪里。当然没有编译。似乎比我已经需要更多的关于仿制药的知识。

对不起一个漫长的问题,很难用几句话来解释整个用途酶。

我是否真的必须将订阅(schedulers.io())添加到每个可完整的?

是的,但是您不需要Completable.merge()

有没有办法将其提取为一种方法?

public static <T> Function<Flowable<T>, Completable> applyIgnore() {
    return f -> f.ignoreElements().subscribeOn(Schedulers.io());
}
Completable locationCompletable = backendService.getLocations()
.doOnNext(data -> locationDao.saveInTx(data))
.to(applyIgnore());

最新更新