RxJava如何接受任何订阅最多5项



在RxJAVA中,我们可以向新订阅者重播旧条目。我想知道是否有一种机制,我可以重播参数数量的条目给订阅者。

Subscriber<String> firstSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub

}
@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub

}
@Override
public void onNext(String t) {
System.out.println("First Subscriber gets -> " + t);
}
};

Subscriber<String> secondSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub

}
@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub

}
@Override
public void onNext(String t) {
System.out.println("Second Subscriber gets -> " + t);
}
};

ReplaySubject<String> replaySubject = ReplaySubject.create();
replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");
replaySubject.onNext("four");
replaySubject.subscribe(firstSubscriber);
replaySubject.onNext("five");
replaySubject.onNext("six");
replaySubject.subscribe(secondSubscriber);

结果:

  • 第一个订阅者获得->一
  • 第一个订阅服务器获得->二
  • 第一个订阅服务器获得->三
  • 第一个订阅服务器获得->四
  • 第一个订阅服务器获得->五
  • 第一个订阅服务器获得->六
  • 第二个订阅服务器得到->一
  • 第二个订阅服务器得到->二
  • 第二个订阅服务器得到->三
  • 第二个订阅服务器得到->四
  • 第二个订阅服务器得到->五
  • 第二个订阅服务器得到->六

我想做的是获取条目的参数数量。例如,firstSubscriber在订阅时将获得最新的2个条目

如果你想要完全动态的有限重播方式,你可以推迟订阅并检查ReplaySubject当前有多少项目,然后跳过你不想要的:

public static <T> Observable<T> replayDynamicSize(ReplaySubject<T> subject, int n) {
return Observable.defer(() -> {
int s = Math.max(0, subject.size() - n);
return skip > 0 ? subject.skip(s) : subject;
});
}
ReplaySubject<String> replaySubject = ReplaySubject.create();
Observable<String> four = replayDynamicSize(replaySubject, 4);
Observable<String> two = replayDynamicSize(replaySubject, 2);
replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");
replaySubject.onNext("four");
four.subscribe(firstSubscriber);
replaySubject.onNext("five");
replaySubject.onNext("six");
two.subscribe(secondSubscriber);

这里的一个考虑因素是,如果您不打算重播超过一定数量的值,ReplaySubject应该由自己来约束,以不保留所有可能的值。

相关内容

  • 没有找到相关文章

最新更新