如何在运行时"merge"可观察量?



我的用例是视图组中的点击侦听器:

  • 每个View发布一个Observable<Click>冷API。

  • 视图组管理视图集合,并提供Observable<ViewClick> API,从子视图发出点击。

我希望每个子视图在添加到复合可观察对象时都被订阅,但前提是复合具有订阅。如果没有,则只有在订阅复合时才应该订阅子元素。

到目前为止,我的解决方案是:
PublishSubject<ViewClick> composite;
Map<View, Subscription> subscriptions;
void addSource(View view, Observable<Click> clicks) {
    Subscription s = clicks.map(click -> new ViewClick(view, click)
     .subscribe(composite::onNext);
    subscriptions.put(view, s);
}
void removeSource(View v) {
    Subscription s = subscriptions.get(v);  
    s.unsubscribe;
    subscriptions.remove(v);  
}
Observable<ViewClick> compositeClicks() {
    return composite;
}

然而,这意味着clicks可观测对象不再是冷的,因为无论compositeClicks是否被订阅,它们都被订阅了。

有更好的方法吗?

你需要一个特殊的"warm" Subject,比如BufferUntilSubscriber,它可以保留源observable,直到单个消费者订阅它。不幸的是,这个类不是官方API的一部分,然而,2.0将有一个官方的UnicastSubject用于同样的目的:

UnicastSubject<Observable<Integer>> inputs = UnicastSubject.create();
Observable<String> p = inputs
        .flatMap(v -> v.map(u -> u.toString()))
        .publish()
        .autoConnect()
        ;
inputs.onNext(Observable.just(1)
    .doOnSubscribe(s -> System.out.println("Subscribed to 1")));
inputs.onNext(Observable.just(2)
    .doOnSubscribe(s -> System.out.println("Subscribed to 2")));
inputs.onNext(Observable.just(3)
    .doOnSubscribe(s -> System.out.println("Subscribed to 3")));
System.out.println("Subscribing to p");
p.subscribe(System.out::println);

你的困难来自于addSource/removeSource不是响应性的,如果你把它们做成Rx流,它们就会消失。

有几种方法:

  • IObservable<IObservable<View>>,其中内部观察对象在激活/添加时将具有单个onNext,在移除时将具有onComplete
  • IObservable<View> x2(一个用于添加,一个用于删除)
  • IObservable<ViewActivationEvent> (tuple with add/remove kind + view instance)

(假设您的点击流可以从视图中提取,否则您需要提供一个元组view+clickStream来代替视图,用于激活部分)

下面是实现流的方式,但所有方法都可以工作:

Subject<View> add,remove;
clicks = add.flatMap(view -> getClickObservable(view).takeUntil(remove.where(v => v == view)))

最新更新