我在Android上使用RXJava,并试图将多个API调用链接在一起,并在两个API调用完成后做一些事情。我的API调用看起来都类似于所提供的代码示例。基本上是调用API,在onNext中将每条记录写入DB,在所有记录都被写入之后,更新一些缓存。我想异步地触发这两个调用,然后在两者都击中onCompleted之后,然后做其他事情。在RX中正确的方法是什么?我认为我不需要zip,因为我不需要将不同的流捆绑在一起。我想也许合并,但我的两个API调用返回不同类型的Observable。请让我知道。谢谢。
getUsers()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.subscribe(new Subscriber<User>() {
@Override
public void onCompleted() {
updateUserCache();
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "Error loading users", e);
}
@Override
public void onNext(User user) {
insertUserToDB(user);
}
});
getLocations()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.subscribe(new Subscriber<Location>() {
@Override
public void onCompleted() {
updateLocationCache();
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "Error loading Locations", e);
}
@Override
public void onNext(Location location) {
insertLocationToDB(location);
}
});
你想得对。您应该使用zip
运算符。
你的每个函数都应该调用,写数据库,做你需要的一切。zip
的输出函数不同:当它被调用时,你可以确定所有的Observable
已经成功完成->只是完成你的反应流。
创建Observable
列表:
List<Observable<?>> observableList = new ArrayList<>();
observableList.add(
getUsers()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.insertUserToDB(user)
.toList());
observableList.add(
getLocations()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.insertLocationToDB(location)
.toList());
Then zip
all Observable
's:
Observable.zip(observableList, new FuncN<Object, Observable<?>>() {
@Override
public Observable<?> call(Object... args) {
return Observable.empty();
}
}).subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
updateUserCache();
updateLocationCache();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
});
这是伪代码,但我希望你能理解。
如果有人需要它,这里是我基于r使用的代码Zagórski建议:
List<Observable<?>> observableList = new ArrayList<>();
observableList.add(
getUsers()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.doOnNext(user->insertUser(user))
.toList()
);
observableList.add(
getLocations()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.doOnNext(location->insertLocation(location))
.toList()
);
Observable.zip(observableList, new FuncN<Object>() {
@Override
public Observable<?> call(Object...args) {
return Observable.empty();
}).subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
updateUserCache();
updateLocationCache();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
});
.zip()是正确的方法
你可能想让Retrofit返回Single而不是Observable