如何将BehaviourSubject
作为Observable
提供默认值?
所以不是:
// observer will receive the "default", "zero", "one"
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.subscribe(observer);
subject.onNext("zero");
subject.onNext("one");
// observer2 will receive the "one", "two", "three"
subject.subscribe(observer2);
subject.onNext("two");
subject.onNext("three");
可以这样做:
Observable<Object> defaultValueObservable = Observable.defer(new Func0<Observable<Object>>() {
@Override public Observable<Object> call() {
// simulate blocking work
return Observable.just("blocking-default");
}
})
// observer will receive the "blocking-default", "zero", "one"
BehaviorSubject<Object> subject = BehaviorSubject.create(defaultValueObservable.first());
subject.subscribe(observer);
subject.onNext("zero");
subject.onNext("one");
// observer2 will receive the "one", "two", "three"
subject.subscribe(observer2);
subject.onNext("two");
subject.onNext("three");
我理解,第一个观察者会等待默认值的计算。我愿意付这个价钱。
基本上,我想要一个内存存储库。我目前的实现思路:
public class Repository<T> {
private final BehaviorSubject<T> subject;
public Repository() {
subject = BehaviorSubject.create();
}
public Observable<T> get() {
assertUiThread();
Observable<T> observable = subject.asObservable();
if (subject.hasValue()) {
return observable;
} else {
return observable.startWith(Observable.defer(new Func0<Observable<T>>() {
@Override public Observable<T> call() {
T t = getTfromBlockingSource();
return Observable.just(t);
}
}));
}
}
public void update(T t) {
assertUiThread();
subject.onNext(t);
}
}
我不确定我理解你的用例,但不是原始的Object
s,我有一个BehaviorSubject<Observable<Object>>
,现在可以生成它的第一个值,任何后续的"原始"值现在可以通过just()
包装。然后,订阅者可以执行.onBackpressureBuffer().concatMap(v -> v)
以恢复顺序。
BehaviorSubject<Observable<Object>> hiddenSubject = BehaviorSubject.create(
Observable.defer(() -> Observable.just("default")));
Subject<Observable<Object>, Observable<Object>> subject =
hiddenSubject.toSerialized();
Observable<Object> output = subject.onBackpressureBuffer().concatMap(v -> v);
output.subscribe(System.out::println);
subject.onNext(Observable.just("one"));
subject.onNext(Observable.just("two"));
output.subscribe(System.out::println);