我是RxJava和RxAndroid的助手,我昨天才开始研究它们。我使用 REST API 从服务器获取数据。我使用两个主要的 Rx 运算符,一个是 .map()
另一个是.zip()
。
以下代码是我的Observable
之一
protected static Observable<List<Subdistrict>> subdistrictObservable = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
try {
return Observable.just(new MasterService.MasterSubdistrict().get());
} catch (InterruptedException ie) {
ie.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
}
return null;
}
})
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s != null;
}
})
.map(new Func1<String, List<Subdistrict>>() {
@Override
public List<Subdistrict> call(String s) {
ObjectMapper mapper = new ObjectMapper();
List<Subdistrict> subdistricts = null;
try {
JsonNode node = mapper.readTree(s);
node = node.get("json");
TypeReference<List<Subdistrict>> typeRef = new TypeReference<List<Subdistrict>>() {
};
subdistricts = mapper.readValue(node.toString(), typeRef);
} catch (IOException ie) {
ie.printStackTrace();
}
return subdistricts;
}
})
.subscribeOn(Schedulers.io());
并将多个Observable
压缩在一起
Observable<List<Region>> observableChain = Observable.zip(
Subdistrict.subdistrictObservable,
District.districtObservable,
Province.provinceObservable,
Region.regionObservable,
new Func4<List<Subdistrict>, List<District>, List<Province>, List<Region>, List<Region>>() {
@Override
public List<Region> call(List<Subdistrict> subdistricts, List<District> districts, List<Province> provinces, List<Region> regions) {
List<Region> zipRegions = new LinkedList<Region>();
// Do my stuff
return zipRegions;
}
});
observableChain
.subscribeOn(Schedulers.io())
.subscribe(regionOnNext);
问题.map()
在未调用Observable
中。我不知道为什么。
我按照@ytRino的建议使用 fromEmitter
并且它可以工作。但是,我不确定它是否正确。因此,如果有人在以下代码中发现错误,请告诉我。
以下代码是 Observable
的工作版本。我添加了一个名为 MasterService.Callback
的抽象类,因此当Asynctask
完成执行时,结果将返回给 Observable
。
protected static Observable<List<Subdistrict>> subdistrictObservable = Observable.fromEmitter(new Action1<AsyncEmitter<List<Subdistrict>>>() {
@Override
public void call(final AsyncEmitter<List<Subdistrict>> listAsyncEmitter) {
try {
new MasterService.MasterSubdistrict(new MasterService.Callback() {
@Override
public void onSuccess(String s) {
List<Subdistrict> subdistricts = null;
// Do something
listAsyncEmitter.onNext(subdistricts);
listAsyncEmitter.onCompleted();
}
@Override
public void onNull() {
listAsyncEmitter.onCompleted();
}
@Override
public void onError(Throwable t) {
listAsyncEmitter.onError(t);
}
}).execute();
} catch (Exception e) {
listAsyncEmitter.onError(e);
}
}
}, AsyncEmitter.BackpressureMode.BUFFER)
.observeOn(AndroidSchedulers.mainThread());