RxJava:如何后处理项目的子集,但将完整列表返回给调用方



我正在编写一个与远程服务器通信的 REST 客户端,需要做类似的事情:

  • 从远程服务器检索项目列表
  • 将收到的列表与本地数据进行比较
  • 对于已在本地修改的任何项目(可能没有修改(,请将更改提交到服务器
  • 最后,将项目的完整列表返回给调用方

我正在使用RxJava(2(,我在实现方面有点挣扎。我认为它应该或多或少是这样的(伪代码(:

public Single<List<Item>> synchronize()
{
return server.fetchItems().flatMap(receivedItemList -> {
List<Single<Item>> syncList = new List<>();
for (Item item : receivedItemList) {
if (item.needsSync()) {
Single<Item> modifiedItem = server.updateItem(item);
syncList.add(modifiedItem);
}    
}
if (!syncList.empty()) {
return Single.zip(syncList,
items -> {
// Changes have been sync'ed,
// return full list to caller
return receivedItemList;
});
} else {
return Single.just(receivedItemList);
}
});
}

我不确定这是否是正确的方法:例如,zipper 函数将项目返回完整列表给调用者 - 我使用 zip 主要是为了确保在完成操作之前完成所有更新。此外,我在一种情况下调用 Single.zip((,在另一种情况下调用 Single.just(( 的事实感觉有点错误。

这看起来正确吗?这是使用RxJava执行此操作的惯用方法吗?有没有更好的方法?

你已经接近 Rx 的方式来做到这一点,但是对代码的一些更改可能会增强它一点,所以关于你的代码的一些注释,我将遵循我的谦虚实现。

  • 每次您想在 Rx 中使用 for 循环时,请尝试fromIterable()运算符,该运算符逐项发出,以便您可以像过滤一样对其执行逻辑。
  • Rx 中有很多过滤运算符,因此您不必在流中编写检查。 最流行的一种是filter(),它将谓词作为参数并 rturn 一个可观察的类型。
  • 如果你想在一个列表中收集排放量,有一个叫做toList()的运算符来排放整个列表。

现在到实施

public Observable<List<String>> fetchNetworkItems() {
return Observable.just(Arrays.asList("A", "B", "C")); //maybe retrofit
}
public Observable<List<String>> localItems() {
return Observable.just(Arrays.asList("A", "B", "C")); //maybe Room
}
public Completable updateRemoteItem(String localItem) {
return Completable.create(emitter -> {
//update logic if things are going well use emitter.onComplete();
//catch any exception  or errors --> use emitter.onError() to throw it
});
}
public Single<List<String>> updateRemoteItems() {
return fetchNetworkItems().flatMap(Observable::fromIterable) //iterate over the remote items
.flatMap(remoteItem -> localItems().flatMap(Observable::fromIterable) //iterate over the local items
.filter(localItem -> !localItem.equals(remoteItem)) //decide which one need to update
.flatMap(localItem -> updateRemoteItem(localItem).andThen(Observable.just(localItem))) //update then get the local item
.defaultIfEmpty(remoteItem) //since no need to update it doesn't matter which item we return, we have access to the remote
).toList();
}

借助fromIterable()flatMap()以及过滤运算符,您可以以功能方式实现嵌套循环逻辑。

编辑:以下实现,如果要更新中的所有项目 同时,请注意,如果一个 API 调用失败,zip()则会导致所有 其他调用失败。

public Observable<List<String>> fetchNetworkItems() {
return Observable.just(Arrays.asList("A", "B", "C")) //maybe retrofit
.subscribeOn(io.reactivex.schedulers.Schedulers.io());//do work in background
}
public Observable<List<String>> localItems() {
return Observable.just(Arrays.asList("A", "B", "C")) //maybe Room
.subscribeOn(io.reactivex.schedulers.Schedulers.io());//do work in background
}
public Observable<String> updateRemoteItem(String localItem) { //may be retrofit
return Observable.just(localItem)
.subscribeOn(io.reactivex.schedulers.Schedulers.io());//do work in background
}
public List<Observable<String>> generateApisCall(List<String> remotesToBeUpdated) {
return Observable.fromIterable(remotesToBeUpdated)
.map(this::updateRemoteItem)
.toList()
.blockingGet();
}
public Single<List<String>> getItemsToUpdate() {
return fetchNetworkItems()
.doOnNext(strings -> {/*may be save the list to access later*/}) //side effects is not a good thing, but that's best what I thought for now.
.flatMap(Observable::fromIterable) //iterate over the remote items
.flatMap(remoteItem -> localItems().flatMap(
Observable::fromIterable) //iterate over the local items
.filter(localItem -> (localItem/*.id*/ == remoteItem/*.id*/) /* && remoteItem.old()*/)
//decide which one need to update
).toList();
}
public void update() {
getItemsToUpdate()
.subscribeOn(io.reactivex.schedulers.Schedulers.io())
.observeOn(io.reactivex.android.schedulers.AndroidSchedulers.mainThread())
.subscribe((strings, throwable) -> {
Observable.zip(generateApisCall(strings), objects -> "success").subscripe(string -> {}, throwable -> {});
});
}

这更像是一个伪代码,所以尝试测试它,这里的类型Item替换为类型String

相关内容

  • 没有找到相关文章

最新更新