在多个可观察对象完成后执行一些操作



我在Android上使用RXJava,并试图将多个API调用链接在一起,并在两个API调用完成后做一些事情。我的API调用看起来都类似于所提供的代码示例。基本上是调用API,在onNext中将每条记录写入DB,在所有记录都被写入之后,更新一些缓存。我想异步地触发这两个调用,然后在两者都击中onCompleted之后,然后做其他事情。在RX中正确的方法是什么?我认为我不需要zip,因为我不需要将不同的流捆绑在一起。我想也许合并,但我的两个API调用返回不同类型的Observable。请让我知道。谢谢。

    getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .subscribe(new Subscriber<User>() {
                @Override
                public void onCompleted() {
                   updateUserCache();
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "Error loading users", e);
                }
                @Override
                public void onNext(User user) {
                    insertUserToDB(user);
                }
            });
    getLocations()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .subscribe(new Subscriber<Location>() {
                @Override
                public void onCompleted() {
                   updateLocationCache();
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "Error loading Locations", e);
                }
                @Override
                public void onNext(Location location) {
                    insertLocationToDB(location);
                }
            });  

你想得对。您应该使用zip运算符。

你的每个函数都应该调用,写数据库,做你需要的一切。zip的输出函数不同:当它被调用时,你可以确定所有的Observable已经成功完成->只是完成你的反应流。

创建Observable列表:

List<Observable<?>> observableList = new ArrayList<>();
observableList.add(
        getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .insertUserToDB(user)
            .toList());
observableList.add(
        getLocations()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .insertLocationToDB(location)
            .toList());

Then zip all Observable 's:

Observable.zip(observableList, new FuncN<Object, Observable<?>>() {
    @Override
    public Observable<?> call(Object... args) {
        return Observable.empty();
    }
}).subscribe(new Subscriber<Object>() {
    @Override
    public void onCompleted() {
        updateUserCache();
        updateLocationCache();
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Object o) {
    }
});

这是伪代码,但我希望你能理解。

如果有人需要它,这里是我基于r使用的代码Zagórski建议:

    List<Observable<?>> observableList = new ArrayList<>();
    observableList.add(
            getUsers()
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .flatMap(Observable::from)
                .doOnNext(user->insertUser(user))
                .toList()
    );
    observableList.add(
            getLocations()
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .flatMap(Observable::from)
                    .doOnNext(location->insertLocation(location))
                    .toList()
    );
    Observable.zip(observableList, new FuncN<Object>() {
        @Override
        public Observable<?> call(Object...args) {
            return Observable.empty();
        }).subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                updateUserCache();
                updateLocationCache();
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(Object o) {
            }
        });

.zip()是正确的方法

你可能想让Retrofit返回Single而不是Observable

相关内容

  • 没有找到相关文章

最新更新