想要缓冲一个Observable,直到另一个激发,然后移除缓冲区,用一个订阅正常激发



我正在Angular应用程序中实现一个分析服务。我在服务的构造函数中创建并附加第三方脚本。在加载脚本之前,可能会出现其他服务尝试激发遥测事件的竞争情况。我想在遥测方法上创建一个缓冲区来保存消息,直到脚本加载,然后刷新缓冲区,然后正常推送。

伪代码:

// Global variable set by 3rd party library after it loads
declare let analytics: any;
class AnalyticsService {
isLoaded$ = new BehaviorSubject<boolean>(false);
identify$ = new BehaviorSubject<user>(null);
constructor() {
this.loadScript();
// Here, I want to buffer messages until the 3rd party script initializes
// Once isLoaded$ fires, forEach the queue and pass to the 3rd party library
// Then for every message after, send one at a time as normal like 
// the buffer wasn't there
this.identify$
.pipe(buffer(this.isLoaded$.pipe(skip(1)) // Essentially want to remove this after it fires
.subscribe((user) => analytics.identify(user));
}
loadScript(): void {
const script = document.createElement('script');
script.innerHTML = `
// setup function from 3rd party
`;
document.querySelector('head').appendChild(script);
interval(1000)
.pipe(take(5), takeUntil(this.isLoaded$))
.subscribe(_ => {
if (analytics) this.isLoaded$.next(true);
})
}
identify(user): void {
this.identify$.next(user);
}
}

如果我使用两个订阅,它将像

identify$
.pipe(
takeUntil(this.isLoaded$),
buffer(this.isLoaded$),
).subscribe(events => events.forEach(user => analytics.identify(user)));
identify$
.pipe(
filter(_ => this.isLoaded$.value),
).subscribe(user => analytics.identify(user))

有没有一种方法可以通过一次订阅做到这一点?

这里可能有一种实现您想要的东西的方法:

constructor () {
this.loadScript();
this.identify$
.pipe(
buffer(
concat(
this.isLoaded$.pipe(skip(1), take(1)),
this.identify.pipe(skip(1)),
)
)
)
.subscribe((user) => analytics.identify(user));
}

要点在于

concat(
this.isLoaded$.pipe(skip(1), take(1)),
this.identify$.pipe(skip(1)),
)

因此,我们首先等待isLoaded$发出true,然后订阅this.identify.pipe(skip(1))

加载脚本后,您希望在this.identify$发出时立即继续。这就是我们从buffer的关闭通知程序再次订阅它的原因。基本上,this.identify$现在将有2个订户。第一个是subscribe((user) => analytics.identify(user)),第二个是来自concat(它是buffer的关闭通知程序(的一个。当this.identify$发出时,该值将按的顺序发送给其订阅者。因此,该值最终将被添加到缓冲区中,然后立即传递给链中的下一个订阅者,因为this.identify$的第二个订阅者将同步接收该值。

这里有一种不需要缓冲区的方法。我们将每个值延迟到这一点。isLoaded$发出true。由于它看起来是这样的。isLoaded$是一个BehaviorSubject,一旦它为true,它将立即发出,否则它将延迟值的传递。

identify$.pipe(
delayWhen(() => this.isLoaded$.pipe(
filter(x => x)
)),
).subscribe(user => analytics.identify(user));

更新#1:自定义运算符

这里有一个自定义运算符,它在函数返回true时延迟发射,在函数返回false时发射缓冲值和当前值。

与之前的解决方案不同,这将保持排放顺序。

function delayWhile<T>(fn: (T)=>boolean): MonoTypeOperatorFunction<T>{
return s => new Observable<T>(observer => {
const buffer = new Array<T>();
const sub = s.subscribe({
next: (v:T) => {
if(fn(v)){
buffer.push(v);
}else{
if(buffer.length > 0){
buffer.forEach(observer.next.bind(observer));
buffer.length = 0;
}
observer.next(v);
}
},
complete: observer.complete.bind(observer),
error: observer.error.bind(observer)
})
return {unsubscribe: () => {sub.unsubscribe()}};
});
}

它在这里使用:

identify$.pipe(
delayWhile(() => !this.isLoaded$.value)
).subscribe(user => analytics.identify(user));

相关内容

最新更新