Observable.map() is not called



我是RxJava和RxAndroid的助手,我昨天才开始研究它们。我使用 REST API 从服务器获取数据。我使用两个主要的 Rx 运算符,一个是 .map() 另一个是.zip()

以下代码是我的Observable之一

protected static Observable<List<Subdistrict>> subdistrictObservable = Observable.defer(new Func0<Observable<String>>() {
    @Override
    public Observable<String> call() {
        try {
            return Observable.just(new MasterService.MasterSubdistrict().get());
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        } catch (ExecutionException ee) {
            ee.printStackTrace();
        }
        return null;
    }
})
        .filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                return s != null;
            }
        })
        .map(new Func1<String, List<Subdistrict>>() {
            @Override
            public List<Subdistrict> call(String s) {
                ObjectMapper mapper = new ObjectMapper();
                List<Subdistrict> subdistricts = null;
                try {
                    JsonNode node = mapper.readTree(s);
                    node = node.get("json");
                    TypeReference<List<Subdistrict>> typeRef = new TypeReference<List<Subdistrict>>() {
                    };
                    subdistricts = mapper.readValue(node.toString(), typeRef);
                } catch (IOException ie) {
                    ie.printStackTrace();
                }
                return subdistricts;
            }
        })
        .subscribeOn(Schedulers.io());

并将多个Observable压缩在一起

Observable<List<Region>> observableChain = Observable.zip(
        Subdistrict.subdistrictObservable,
        District.districtObservable,
        Province.provinceObservable,
        Region.regionObservable,
        new Func4<List<Subdistrict>, List<District>, List<Province>, List<Region>, List<Region>>() {
            @Override
            public List<Region> call(List<Subdistrict> subdistricts, List<District> districts, List<Province> provinces, List<Region> regions) {
                List<Region> zipRegions = new LinkedList<Region>();
                // Do my stuff
                return zipRegions;
            }
        });
observableChain
        .subscribeOn(Schedulers.io())
        .subscribe(regionOnNext);

问题.map()在未调用Observable中。我不知道为什么。

我按照@ytRino的建议使用 fromEmitter 并且它可以工作。但是,我不确定它是否正确。因此,如果有人在以下代码中发现错误,请告诉我。

以下代码是 Observable 的工作版本。我添加了一个名为 MasterService.Callback 的抽象类,因此当Asynctask完成执行时,结果将返回给 Observable

protected static Observable<List<Subdistrict>> subdistrictObservable = Observable.fromEmitter(new Action1<AsyncEmitter<List<Subdistrict>>>() {
    @Override
    public void call(final AsyncEmitter<List<Subdistrict>> listAsyncEmitter) {
        try {
            new MasterService.MasterSubdistrict(new MasterService.Callback() {
                @Override
                public void onSuccess(String s) {
                    List<Subdistrict> subdistricts = null;
                    // Do something
                    listAsyncEmitter.onNext(subdistricts);
                    listAsyncEmitter.onCompleted();
                }
                @Override
                public void onNull() {
                    listAsyncEmitter.onCompleted();
                }
                @Override
                public void onError(Throwable t) {
                    listAsyncEmitter.onError(t);
                }
            }).execute();
        } catch (Exception e) {
            listAsyncEmitter.onError(e);
        }
    }
}, AsyncEmitter.BackpressureMode.BUFFER)
        .observeOn(AndroidSchedulers.mainThread());

最新更新