如何编写像replay
这样的运算符,重播与谓词匹配的最后一个项目,然后重播所有未来的项目?
例如,如果热可观察量发出A, B, C, D, E, F
并且我的谓词匹配元音,则在C
和D
之间加入的订阅者应该收到A, D, E, F
。
我只是一个鸡蛋,但这是我想出的重播单个项目的方法:
public static <T> Observable<T> replay(final Observable<T> observable,
final Predicate<? super T> predicate) {
final AtomicReference<T> mLastMatch = new AtomicReference<>();
return observable.map(e -> {
if (predicate.test(e)) {
mLastMatch.set(e);
}
return e;
})
.startWith(Observable.defer(() -> {
final T t = mLastMatch.get();
return t == null ? Observable.empty() : Observable.just(t);
}));
}