在RxJAVA中,我们可以向新订阅者重播旧条目。我想知道是否有一种机制,我可以重播参数数量的条目给订阅者。
Subscriber<String> firstSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub
}
@Override
public void onNext(String t) {
System.out.println("First Subscriber gets -> " + t);
}
};
Subscriber<String> secondSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub
}
@Override
public void onNext(String t) {
System.out.println("Second Subscriber gets -> " + t);
}
};
ReplaySubject<String> replaySubject = ReplaySubject.create();
replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");
replaySubject.onNext("four");
replaySubject.subscribe(firstSubscriber);
replaySubject.onNext("five");
replaySubject.onNext("six");
replaySubject.subscribe(secondSubscriber);
结果:
- 第一个订阅者获得->一
- 第一个订阅服务器获得->二
- 第一个订阅服务器获得->三
- 第一个订阅服务器获得->四
- 第一个订阅服务器获得->五
- 第一个订阅服务器获得->六
- 第二个订阅服务器得到->一
- 第二个订阅服务器得到->二
- 第二个订阅服务器得到->三
- 第二个订阅服务器得到->四
- 第二个订阅服务器得到->五
- 第二个订阅服务器得到->六
我想做的是获取条目的参数数量。例如,firstSubscriber在订阅时将获得最新的2个条目
如果你想要完全动态的有限重播方式,你可以推迟订阅并检查ReplaySubject
当前有多少项目,然后跳过你不想要的:
public static <T> Observable<T> replayDynamicSize(ReplaySubject<T> subject, int n) {
return Observable.defer(() -> {
int s = Math.max(0, subject.size() - n);
return skip > 0 ? subject.skip(s) : subject;
});
}
ReplaySubject<String> replaySubject = ReplaySubject.create();
Observable<String> four = replayDynamicSize(replaySubject, 4);
Observable<String> two = replayDynamicSize(replaySubject, 2);
replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");
replaySubject.onNext("four");
four.subscribe(firstSubscriber);
replaySubject.onNext("five");
replaySubject.onNext("six");
two.subscribe(secondSubscriber);
这里的一个考虑因素是,如果您不打算重播超过一定数量的值,ReplaySubject
应该由自己来约束,以不保留所有可能的值。