如何在RXJava Android的后台线程上执行长时间的计算



我想在android中使用RXJava在后台线程上执行长计算。计算后,我试图在Recylerview中呈现结果。我使用下面的代码:

Observable.just("true")
                .subscribeOn(Schedulers.io())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        feedlist.clear();
                        if (eventFeedItems != null && !eventFeedItems.isEmpty()) {
                            for (int i = 0; i < eventFeedItems.size(); i++) {
                                if (eventFeedItems != null && eventFeedItems.get(i) != null
                                        && ((eventFeedItems.get(i).getType() != null && eventFeedItems.get(i).getType().equalsIgnoreCase("EVENT"))
                                        || (eventFeedItems.get(i).getActivityRequestType() != null && eventFeedItems.get(i).getActivityRequestType().equalsIgnoreCase(EventConstants.TRENDING_ACTIVITY)))) {
                                    if (eventFeedItems.get(i).getActivityRequestType() != null && !eventFeedItems.get(i).getActivityRequestType().equalsIgnoreCase("")) {
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), eventFeedItems.get(i).getActivityRequestType(), null));
                                    } else if (eventFeedItems.get(i).getRequestType() != null && !eventFeedItems.get(i).getRequestType().equalsIgnoreCase("")) {
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), eventFeedItems.get(i).getRequestType(), null));
                                    } else
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), EventConstants.ATTENDEE_POST, null));
                                }
                            }
                        }
                        Log.d("calculations","Completed");
                        return "";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
//                        feed_list.setLayoutManager(mLayoutManager);
//                        feedListAdapter.notifyDataSetChanged();
                        Log.d("Adapter", "Set");
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("Exception", "oh! fish...");
                        throwable.printStackTrace();
                    }
                });

在上面的代码中,我面临着UI障碍,因为ArrayList eventFeedItems的大小大约超过300个项目。我是RXJava的新手。

使用map-Operator将无法实现并发。

第一个subscribeOn将把所有的排放移动到io调度程序。这里没有并发。

.subscribeOn(Schedulers.io())

映射操作符将从前一个线程同步调用。在您的情况下,它可能是io线程池中的某个线程。

.map(new Func1<String, String>() {

map-Operator被执行后,你将使用

将值从io线程移动到Android-UI-event-loop
.observeOn(AndroidSchedulers.mainThread())

值从io线程转换到ui线程后,初始可观察对象的下一个值将被处理。

Observable.just("true")

在你的例子中,不会有更多的值,因为你只产生一个值。

为了实现并发,你应该使用flatMap而不是map。并且在flatMap中使用subscribeOn()在另一个线程上创建每个流。

请考虑这个例子,看看并发是如何发生的。每个可观察对象都会一次被订阅,所以最大。测试的时间大约是5秒。如果现在并发发生,则需要1+2+3+4+5秒加上执行时间。

@Test
public void name1() throws Exception {
        Observable<Integer> value = Observable.just(1_000, 2_000, 3_000, 4_000, 5_000)
                .flatMap(i -> Observable.fromCallable(() -> doWork(i)).subscribeOn(Schedulers.io())
                ).doOnNext(integer -> System.out.println("value"));
        value.test().awaitTerminalEvent();
}
private int doWork(int sleepMilli) {
        try {
            Thread.sleep(sleepMilli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return -1;
}

如果你想了解更多关于flatMap的并发性,请考虑阅读http://tomstechnicalblog.blogspot.de/2015/11/rxjava-achieving-parallelization.html

关于你的代码,我建议:

  • 将接口的匿名实现移动到私有内部类实现中,并使用它的实例。你会得到一个可读性更强的observable
  • 不要对内部操作符的全局变量使用副作用。你如果涉及并发,将会出现竞争条件。

    List<FeedsListModel> eventFeedItems = Arrays.asList(new FeedsListModel(), new FeedsListModel());
    Observable<FeedsListModel> feedsListModelObservable = Observable.fromIterable(eventFeedItems)
                        .flatMap(feedsListModel -> Observable.fromCallable(() -> calculation(feedsListModel))
                                .subscribeOn(Schedulers.computation())
    );
    feedsListModelObservable
            .toList()
            .observeOn(Schedulers.io())
            .subscribe(feedsListModels -> {
                // do UI stuff
    });
    

Helping-method:

private FeedsListModel calculation(FeedsListModel model) {
    // do calculation here
    return new FeedsListModel();
}

相关内容

  • 没有找到相关文章

最新更新