我有一个方法,getObs()
,它返回一个可观察量,应该由所有调用者共享。但是,当有人调用 getObs()
时,该可观察量可能不存在,并且创建它是一个异步操作,所以我的想法是返回一个占位符可观察量,一旦创建它就会被替换为真正的可观察量。
我的基本尝试是这样的:
var createSubject = new Rx.Subject();
var placeholder = createSubject.switchLatest();
如果调用"getObs(("时真正的可观察量不存在,我可以返回placeholder
。创建真正的可观察量时,我使用 createSubject.onNext(realObservable)
,然后将其传递给switchLatest()
,为任何订阅者解包。
但是,为此目的使用主题和switchLate似乎有些矫枉过正,所以我想知道是否有更直接的解决方案?
如果获取可观察量本身的行为是异步的,则也应将其建模为可观察量。
例如。。。
var getObsAsync = function () {
return Rx.Observable.create(function (observer) {
var token = startSomeAsyncAction(function (result) {
// the async action has completed!
var obs = Rx.Observable.fromArray(result.dataArray);
token = undefined;
observer.OnNext(obs);
observer.OnCompleted();
}),
unsubscribeAction = function () {
if (asyncAction) {
stopSomeAsyncAction(token);
}
};
return unsubscribeAction;
});
};
var getObs = function () { return getObsAsync().switchLatest(); };
如果您想共享该可观察量的单个实例,但不希望在有人实际订阅之前获得可观察量,那么您可以:
// source must be a Connectable Observable (ie the result of Publish or Replay)
// will connect the observable the first time an observer subscribes
// If an action is supplied, then it will call the action with a disposable
// that can be used to disconnect the observable.
// idea taken from Rxx project
Rx.Observable.prototype.prime = function (action) {
var source = this;
if (!(source instanceof Rx.Observable) || !source.connect) {
throw new Error("source must be a connectable observable");
}
var connection = undefined;
return Rx.Observable.createWithDisposable(function (observer) {
var subscription = source.subscribe(observer);
if (!connection) {
// this is the first observer. Connect the underlying observable.
connection = source.connect();
if (action) {
// Call action with a disposable that will disconnect and reset our state
var disconnect = function() {
connection.dispose();
connection = undefined;
};
action(Rx.Disposable.create(disconnect));
}
}
return subscription;
});
};
var globalObs = Rx.Observable.defer(getObs).publish().prime();
现在代码可以在任何地方使用globalObs而不用担心它:
// location 1
globalObs.subscribe(...);
// location 2
globalObs.select(...)...subscribe(...);
请注意,实际上甚至没有人需要调用getObs
,因为您只是设置了一个全局可观察量,当有人订阅时,它将(通过defer
(为您调用getObs
。
您可以在事后使用主题来连接源:
var placeholder = new Subject<YourType>();
// other code can now subscribe to placeholder, best expose it as IObservable
创建源时:
var asyncCreatedObs = new ...;
placeholder.Subscribe(asyncCreatedObs);
// subscribers of placeholder start to see asyncCreatedObs