我正试图让Observable
与所有订阅者共享其排放量,这样它就可以订阅((一次。我尝试使用Observable.publish()
,但如果发布的Observable
的订阅者在源Observable
完成订阅,则他们似乎不会收到任何终止消息(onCompleted()
,可能还有onError()
(。这里有一段代码来证明:
static <T> Observer<T> printObserver(String name) {
return new Observer<T>() {
@Override public void onCompleted() {
System.out.println(name + ": onCompleted()");
}
@Override public void onError(Throwable e) {
System.out.println(name + ": onError( " + e + " )");
}
@Override public void onNext(T value) {
System.out.println(name + ": onNext( " + value + " )");
}
};
}
public void testRxPublishConnect() throws Exception {
Observable<Integer> sourceObservable = Observable.range(1, 5);
ConnectableObservable<Integer> sharedObservable = sourceObservable.publish();
sharedObservable.subscribe(printObserver("Observer #1"));
sharedObservable.connect();
sharedObservable.subscribe(printObserver("Observer #2"));
}
这就是打印的内容:
观察员#1:onNext(1(观察员#1:onNext(2(观察员#1:onNext(3(观察员#1:onNext(4(观察员#1:onNext(5(观察者#1:onCompleted((
注意,Observer #2
没有接收到onCompleted()
。我不认为这是想要的行为。我是不是错过了什么?我在RxJava 1.0.8和1.0.14版本中进行了尝试,得到了相同的结果。
尝试.share()
,即.publish().refCount()
。
这是经过设计的。如果您在这种情况下拨打connect()
,您的订阅者将从一开始就收到所有事件。如果一个终止的publish
会立即终止其子订阅服务器,您可能无法观察到值,因为一旦连接,如果没有订阅服务器,publish
就会缓慢地清除其源。
我99%确信这是预期的行为。我不确定RxJava,但在发布&根据我所知的订阅模式,可观察的默认行为是向订阅服务器发布事件,然后忘记它们。这意味着通知不是"反向活动的"(即订阅者对过去发出的事件一无所知(。
此外,来自RxJava文档的可观察合同("多个观察者"部分(:
If a second observer subscribes to an Observable that is already emitting items to a first observer, it is up to the Observable whether it will thenceforth emit the same items to each observer ... There is no general guarantee that two observers of the same Observable will see the same sequence of items
。
Publish的工作原理是构建一个所有订阅者的列表,然后一旦调用connect()
,它就开始为其订阅者列表中的所有订阅者生成数据。这意味着在调用connect之前,必须知道所有的订阅者。以下是如何使用publish()
或可能更可取的publish(Func1<Observable<T>, Observable<R>>)
过载。
已知订阅者数量:发布
Func关闭所有订阅。
observableStream.publish(new Func1<Observable<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(Observable<Integer> subject) {
Observable<Integer> o1 = subject.doOnNext(somework1());
Observable<Integer> o2 = subject.doOnNext(somework2());
return Observable.merge(o1, o2);
}
});
手动呼叫连接和订阅:
ConnectableObservable<Integer> subject = observableStream.publish();
subject.subscribe(somework1());
subject.subscribe(somework2());
subject.connect();
如果你不知道你会有多少订阅者,那么你可以将输入窗口设置为可管理的块,然后在你的Transformer
s集合上发布你的输入。
未知订阅者数量:Window
final Set<Transformer<Integer, String>> transformers = new HashSet<>();
observableStream
.window(100, TimeUnit.MILLISECONDS, 1000)
.flatMap(new Func1<Observable<Integer>, Observable<String>>(){
@Override
public Observable<String> call(Observable<Integer> window) {
return window.publish(new Func1<Observable<Integer>, Observable<String>>() {
@Override
public Observable<String> call(Observable<Integer> publish) {
Observable<Observable<String>> workObservables = Observable.from(transformers)
.map(new Func1<Transformer<Integer, String>, Observable<String>>(){
@Override
public Observable<String> call(Transformer<Integer, String> transformer) {
return publish.compose(transformer);
}});
return Observable.merge(workObservables);
}});
}})
.subscribe();
还有第三种选择。您可以使用observable.cache()
,但这将在内存中保存来自该可观察流的所有输入数据,因此您需要小心如何使用它。在这种情况下,您可能最终会使用窗口来控制缓存主题的边界。