如何将多播 OBS 与行为主体一起使用?



一般来说,我们需要行为主体功能。但只有在第一次订阅时,我们才应该在 REST 中将订阅发送到服务器。并在最后一次取消订阅时发送取消订阅,所有延迟订阅的观察者将 gwt 从第一个取消订阅的最新 json。我可以使用 rxjs 操作器以及如何做到这一点吗?还是我使用自定义obserbale?

目前,此的自定义代码是这样的:

public observable: Observable<TPattern> = new Observable((observer: Observer<TPattern>) => {
this._observers.push(observer);
if (this._observers.length === 1) {
this._subscription = this.httpRequestStream$
.pipe(
map((jsonObj: any) => {
this._pattern = jsonObj.Data;
return this._pattern;
})
)
.subscribe(
(data) => this._observers.forEach((obs) => obs.next(data)),
(error) => this._observers.forEach((obs) => obs.error(error)),
() => this._observers.forEach((obs) => obs.complete())
);
}
if (this._pattern !== null) {
observer.next(this._pattern); // send last updated array
}
return () => {
const index: number = this._observers.findIndex((element) => element === observer);
this._observers.splice(index, 1);
if (this._observers.length === 0) {
this._subscription.unsubscribe();
this._pattern = null; // clear pattern when unsubscribed
}
};

}(;

听起来你需要一个shareReplay(1),它将与所有订阅者共享最新的响应。

const stream$ = httpRequestStream$.pipe(
shareReplay(1),
),
stream$.subscribe(); // sends the request and gets its result
stream$.subscribe(); // doesn't send it but gets cached result
stream$.subscribe(); // doesn't send it but gets cached result
stream$.subscribe(); // doesn't send it but gets cached result

最新更新