RxJava:如何重置长时间运行的热可观察链



对于我的应用程序的搜索功能,我有一个热可观察链,它做以下事情:

  1. 接受用户输入字符串到EditText (TextChangedEvent)(在mainThread上)
  2. 脱机300ms(在computation线程上)
  3. 显示加载旋转器(mainThread)
  4. 使用该字符串查询SQL数据库(此查询可以从100ms到2000ms不等)(在Schedulers.io()上)
  5. 显示结果给用户(mainThread)

因为步骤3的长度是可变的,所以出现了一个竞争条件,即较不近期的搜索结果显示在较近期的结果之上(有时)。假设用户想要输入chicken,但由于键入速度奇怪,单词的第一部分在整个词之前发出:

  • 先搜索chick,再搜索chicken
  • chick需要1500ms执行,而chicken需要300ms执行。
  • 这将导致chick搜索结果不正确地显示搜索词chicken。这是因为chicken搜索首先完成(只花了300毫秒),其次是chick搜索(1500毫秒)。

我该如何处理这种情况?

  • 一旦用户通过TextChangedEvent触发了一个新的搜索,我就不在乎旧的搜索,即使它还在运行。有没有办法取消以前的搜索?

完整可观察代码:

subscription = WidgetObservable.text(searchText)
                .debounce(300, TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                        //do this on main thread because it's a UI element (cannot access a View from a background thread)
                        //get a String representing the new text entered in the EditText
                .map(new Func1<OnTextChangeEvent, String>() {
                    @Override
                    public String call(OnTextChangeEvent onTextChangeEvent) {
                        return onTextChangeEvent.text().toString().trim();
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        presenter.handleInput(s);
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s != null && s.length() >= 1 && !s.equals("");
                    }
                }).doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Timber.d("searching for string: '%s'", s);
                    }
                })
                        //run SQL query and get a cursor for all the possible search results with the entered search term
                .flatMap(new Func1<String, Observable<SearchBookmarkableAdapterViewModel>>() {
                    @Override
                    public Observable<SearchBookmarkableAdapterViewModel> call(String s) {
                        return presenter.getAdapterViewModelRx(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                        //have the subscriber (the adapter) run on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                        //subscribe the adapter, which receives a stream containing a list of my search result objects and populates the view with them
                .subscribe(new Subscriber<SearchBookmarkableAdapterViewModel>() {
                    @Override
                    public void onCompleted() {
                        Timber.v("Completed loading results");
                    }
                    @Override
                    public void onError(Throwable e) {
                        Timber.e(e, "Error loading results");
                        presenter.onNoResults();
                        //resubscribe so the observable keeps working.
                        subscribeSearchText();
                    }
                    @Override
                    public void onNext(SearchBookmarkableAdapterViewModel searchBookmarkableAdapterViewModel) {
                        Timber.v("Loading data with size: %d into adapter", searchBookmarkableAdapterViewModel.getSize());
                        adapter.loadDataIntoAdapter(searchBookmarkableAdapterViewModel);
                        final int resultCount = searchBookmarkableAdapterViewModel.getSize();
                        if (resultCount == 0)
                            presenter.onNoResults();
                        else
                            presenter.onResults();
                    }
                });

使用switchMap代替flatMap。这将导致当你开始一个新查询时,它会丢弃*上一个查询。

*工作原理:

每当外部源可观察对象产生一个新值时,switchMap调用你的选择器来返回一个新的内部可观察对象(本例中是presenter.getAdapterViewModelRx(s))。switchMap然后从之前侦听的内部可观察对象中取消订阅,而订阅到新的。

取消订阅前一个内部可观察对象有两个作用:

  1. 由可观察对象产生的任何通知(值、完成、错误等)都将被静默忽略并丢弃。

  2. 可观察对象将被通知它的观察者已经取消订阅,并且可以选择采取措施取消它所代表的任何异步进程。

你放弃的查询是否真的被取消完全取决于presenter.getAdapterViewModelRx()的实现。理想情况下,它们应该被取消,以避免不必要地浪费服务器资源。但是,即使它们继续运行,上面的第1条也可以防止提前输入代码看到过时的结果。

相关内容

  • 没有找到相关文章

最新更新