RxJava计数(),当两个可观察到的值之一未完成时



trigger未完成时,我想从streamObservable计数项目。当出现trigger时,我想按stream的大小更新视图。只要trigger未完成Consumer未调用accept()。我该如何解决?

Observable<Long> trigger = Observable.interval(2000L, TimeUnit.MILLISECONDS);
Observable<Long> stream = trigger
.flatMap(new Function<Long, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Long aLong) throws Exception {
return Observable.just("A", "B", "C"); //completed observable
}
})
.count()
.toObservable();
stream.subscribe(new Consumer<Long>() {
@Override
public void accept(Long size) throws Exception {
Log.e("Elements: ", size.toString());
}
});

使用scan:进行滚动计数

Observable<Long> trigger = Observable.interval(2000L, TimeUnit.MILLISECONDS);
Observable<Long> stream = trigger
.flatMap(new Function<Long, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Long aLong) throws Exception {
return Observable.just("A", "B", "C"); //completed observable
}
})
.scan(0L, new BiFunction<Long, String, Long>() {
@Override public Long apply(Long a, String b) {
return a + 1;
}
})
;
stream.subscribe(new Consumer<Long>() {
@Override
public void accept(Long size) throws Exception {
Log.e("Elements: ", size.toString());
}
});

最新更新