RxJava collect() & takeUntil()



我有一个未知大小的用户列表。我想要的是查询前30和更新UI。然后我想用100步的偏移量查询所有其他人,直到我得到最后一批用户-我应该在这里使用takeUntil吗?),当我得到- i通过添加剩余的用户来更新UI(与reduce()相结合,我相信)。

这是我的代码:

final int INITIAL_OFFSET = 0;
final int INITIAL_LIMIT = 30;
// Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", INITIAL_OFFSET, INITIAL_LIMIT)
        // Loading remaining users 100 by 100 and updating UI after all users been loaded
        .flatMap(users -> {
            AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
            return Observable.just(users)
                    .flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .collect(() -> new ArrayList<User>(), (b, s) -> {
                        b.addAll(s);
                        newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
                    })
                    .repeat()
                    .takeUntil(friends -> friends.size() == 0);
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(users -> getView().appendAllFriends(users),
                throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false));

但似乎我做错了,因为onNext是调用每次改造调用

回答我自己的问题。Adels的答案很好,但我需要有一个单一的订阅(我使用Nucleus MVP库),我想使用collect()和takeUntil()而不是while循环(这需要阻塞改造接口方法)。

花了几个小时终于得到了:

final int INITIAL_LIMIT = 30;
// Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", null, INITIAL_LIMIT)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        // Updating UI 1st time or show error
        .doOnNext(users -> getView().appendAllFriends(users))
        .doOnError(throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false))
        // Loading remaining users 100 by 100 and updating UI after all users been loaded
        .flatMap(users -> {
            AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
            ArrayList<User> remainingUsers = new ArrayList<>();
            AtomicBoolean hasMore = new AtomicBoolean(true);
            return Observable.just(users)
                    .flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .collect(() -> remainingUsers, (b, s) -> {
                        // Needed for takeUntil
                        hasMore.set(b.addAll(s));
                        newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
                    })
                    .repeat()
                    .takeUntil(friends -> !hasMore.get())
                    // Grab all items emitted by collect()
                    .last()
                    // Updating UI last time
                    .doOnNext(users2 -> getView().appendAllFriends(users2));
        })
        .subscribe();

也许它会对其他也在使用Nucleus的人有用

// cache() will ensure that we load the first pack only once
Observable<Users> firstPack = firstPack().cache();
// this subscription is for updating the UI on the first batch
firstPack
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(x -> draw(x), e -> whoops(e));
// this subscription is for collecting all the stuff
// do whatever tricks you need to do with your backend API to get the full list of stuff
firstPack
  .flatMap(fp -> rest(fp))
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(x -> allUsers(x), e -> whoops(e));
// I would do this in a simple while loop
Observable<List<User>> rest(List<User> firstPack) {
  return Observable.create(sub -> {
    final List<User> total = firstPack;
    try {
      while (!sub.isUnsubscribed()) {
        final List<User> friends = api.getFriendsBlocking(total.size());
        if (friends.isEmpty()) {
          sub.onNext(total);
          sub.onCompleted();
        } else {
          total.addAll(friends);
        }
      }
    } catch(IOException e) {
      sub.onError(e);
    }
  })
}

相关内容

  • 没有找到相关文章

最新更新