帮助编写多个网络调用并将结果累积到Rxjava中。(我在安卓应用程序中使用。)
State
-- List<City> cityList;
City
- cityId;
RestCall 1
Observable<State> stateRequest = restService.getStates();
RestCall 2
Observable<CityDetail> cityRequest = restService.getCityDetail(cityId);
在UI中,我必须在获得每个城市的所有详细信息后显示城市列表,然后在列表视图中显示。我如何实现parllel网络呼叫并积累结果?
我希望所有城市详细信息结果都放在源状态"对象"的列表中。由于状态对象有一些信息也需要被忽略。这可能吗?
stateRequest ???
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<State>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(State result) {
// Get city list and display
}
});
我检查了这个例子,它展示了我们如何压缩多个可观察到的响应。下面的片段显示了3个可观测值的组合。但在我的情况下,我必须并行或顺序地进行20次网络呼叫(我的意思是在后台,但一个接一个)。我该如何做到这一点。有什么帮助或指示吗?
https://gist.github.com/skehlet/9418379
Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
@Override
public Map<String, String> call(String s, Integer integer, Integer integer2) {
Map<String, String> map = new HashMap<String, String>();
map.put("f3", s);
map.put("f4", String.valueOf(integer));
map.put("f5", String.valueOf(integer2));
return map;
}
我认为您的代码可以简化为这样,因为您对zip
运算符的使用接近于对toList
运算符的使用
stateRequest
.subscribe(State state -> {
Observable.from(state.getCityList())
.flatMap(City city -> restService.getCityDetail(city.getId())
.toList()
.subscribe(List<City> cities -> {
state.clear();
state.addAll(cities);
});
});
由于RxJava没有提供throttle
操作符,您可以构建类似的东西:
Observable<City> limiter = Observable.zip(Observable.interval(1, SECONDS), aCity, (i, c) -> c);
使用这个,限幅器是一个可观测到的,每秒将发射一个城市。
因此,对于您的代码,如果您想将调用限制为getCityDetail
,例如:
Observable<Object> limiter = Observable.interval(1, SECONDS);
stateRequest
.subscribe(State state -> {
Observable.zip(limiter, Observable.from(state.getCityList()), (i, c) -> c)
.flatMap(City city -> restService.getCityDetail(city.getId())
.toList()
.subscribe(List<City> cities -> {
state.clear();
state.addAll(cities);
});
});
stateRequest
.flatMap(new Func1<State, Observable<State>>() {
@Override
public Observable<State> call(final State state) {
List<Observable> cityObservablesList = new ArrayList<Observable>();
for(City city: state.getCityList()) {
cityObservablesList.add(restService.getCityDetail(city.getId());
}
Observable cityObservables = Observable.from(cityObservablesList);
return Observables.zip(cityObservables, new FuncN<State>() {
@Override
public State call(Object... args) {
List<City> cityList = state.getCityList();
cityList.clear();
for(Object object: args) {
cityList.add((City)object);
}
return state;
}
})
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<State>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(State result) {
// Get city list and display
}
});
我在zip操作符和Iterable for city list作为第一个参数的帮助下完成了它。但我面临另一个问题。由于zip并行执行作业,10-15个网络调用并行执行,服务器拒绝每秒最大查询错误(QPS-403)。如何指示zip操作员一个接一个地执行任务?
我确实解决了这个问题,在城市观测中添加了一个延迟[延迟(c*200,时间单位:百万秒)]。但这似乎不是一个合适的解决方案。
有什么建议吗?
看看flatMap(Function..,BiFunction..)。也许这就是您所需要的。
statesRepository.getStates()
.flatMap(states-> Observable.fromIterable(states))
.flatMap(
state-> cityRepository.getStateCities(state),
(state, cityList) -> {
state.setCities(cityList);
return state;
})
.subscribe(state-> showStateWithCity(state));