BehaviorSubject with defaultValue as Observable



如何将BehaviourSubject作为Observable提供默认值?

所以不是:

// observer will receive the "default", "zero", "one"
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.subscribe(observer);
subject.onNext("zero");
subject.onNext("one");
// observer2 will receive the "one", "two", "three"
subject.subscribe(observer2);
subject.onNext("two");
subject.onNext("three");

可以这样做:

Observable<Object> defaultValueObservable = Observable.defer(new Func0<Observable<Object>>() {
@Override public Observable<Object> call() {
    // simulate blocking work
    return Observable.just("blocking-default");
   }
})
// observer will receive the "blocking-default", "zero", "one"
BehaviorSubject<Object> subject = BehaviorSubject.create(defaultValueObservable.first());
subject.subscribe(observer);
subject.onNext("zero");
subject.onNext("one");
// observer2 will receive the "one", "two", "three"
subject.subscribe(observer2);
subject.onNext("two");
subject.onNext("three");

我理解,第一个观察者会等待默认值的计算。我愿意付这个价钱。

基本上,我想要一个内存存储库。我目前的实现思路:

public class Repository<T> {
    private final BehaviorSubject<T> subject;
    public Repository() {
        subject = BehaviorSubject.create();
    }
    public Observable<T> get() {
        assertUiThread();
        Observable<T> observable = subject.asObservable();
        if (subject.hasValue()) {
            return observable;
        } else {
            return observable.startWith(Observable.defer(new Func0<Observable<T>>() {
                @Override public Observable<T> call() {
                    T t = getTfromBlockingSource();
                    return Observable.just(t);
                }
            }));
        }
    }
    public void update(T t) {
        assertUiThread();
        subject.onNext(t);
    }
}

我不确定我理解你的用例,但不是原始的Object s,我有一个BehaviorSubject<Observable<Object>>,现在可以生成它的第一个值,任何后续的"原始"值现在可以通过just()包装。然后,订阅者可以执行.onBackpressureBuffer().concatMap(v -> v)以恢复顺序。

BehaviorSubject<Observable<Object>> hiddenSubject = BehaviorSubject.create(
    Observable.defer(() -> Observable.just("default")));
Subject<Observable<Object>, Observable<Object>> subject = 
    hiddenSubject.toSerialized();
Observable<Object> output = subject.onBackpressureBuffer().concatMap(v -> v);
output.subscribe(System.out::println);
subject.onNext(Observable.just("one"));
subject.onNext(Observable.just("two"));
output.subscribe(System.out::println);

相关内容

  • 没有找到相关文章

最新更新