我正在尝试实现一个缓冲区,这样我就可以在ajax请求正在运行时组合事件,这样它们基本上可以批处理在一起,并在ajax请求不在运行时发送。例如,如果我有这样的内容:
const updateQueue$ = new Subject<ISettings>();
// nothing in flight right now so this should go straight to the server
updateQueue$.next({ volume: 30 });
// previous update is still in flight so queue this up
updateQueue$.next({ volume: 40 });
updateQueue$.next({ volume: 50 });
updateQueue$.next({ volume: 60, muted: true });
// original update finally finishes, combine all the latest
// items and send them now which should look something like
// this: { volume: 60, muted: true }
我对此有一个stackblitz,但它不像我想要的那样工作,我不知道为什么https://stackblitz.com/edit/rxjs-vgd9mq
const allowSend$ = new BehaviorSubject<boolean>(true);
const updateQueue$ = new Subject<ISettings>();
updateQueue$
.asObservable()
.pipe(
tap(item => log("item added to updateQueue$", item)),
// It seems to me I have *something* wrong here, but I haven't been able to pin point it. This doesn't appear to ever run.
bufferToggle(
allowSend$.asObservable().pipe(filter(allowSend => !allowSend)),
() =>
allowSend$.asObservable().pipe(
filter(allowSend => allowSend),
tap(() => "closing buffer")
)
),
tap(bufferContents =>
console.log("bufferContents pre filter", bufferContents)
),
filter(buffer => !!buffer.length),
map(bufferContents => {
return Object.assign({}, ...bufferContents);
}),
tap(bufferContents => console.log("combined buffer", bufferContents)),
// Send network request
switchMap(value => {
allowSend$.next(false);
return sendToServer(value as any);
}),
tap(() => allowSend$.next(true))
// Push to subject to allow next debounced value through
// tap(() => allowNext$.next(true))
)
.subscribe(response => {
log("done sending to server");
});
interval(2000)
.pipe(
map(() => {
updateQueue$.next(generateRandomEvent());
})
)
.subscribe();
你可以利用throttle
来减少排放,直到指定的可观察到排放。有两种设置来控制排放行为:
leading
-允许第一次排放通过。trailing
-允许最后一次发射通过(在"指定可观测"之前的最后一次发射;)。
这个想法是使用throttle
来降低排放,然后在http调用完成后,调用.next
上的触发可观察到的,所以油门将恢复允许排放。
我们可以使用scan
将所有的排放累积到一个单一的变化对象中。
那么在简化的形式,这应该为您工作:
resume$ = new Subject<void>(); // used to tell 'throttle' to allow emissions
queue$ = new Subject<number>();
work$ = this.queue$.pipe(
// accumulate all updates into a single update object
scan((acc, cur) => ({ ...acc, ...cur }), {}),
throttle(() => this.resume$, { leading: true, trailing: true }),
concatMap(state => this.updateState(state).pipe(
finalize(() => this.resume$.next())
))
);
由于throttle
是控制排放的,我认为一次只有一个排放会通过,所以我认为无论你选择switchMap
,concatMap
,mergeMap
还是exhaustMap
,行为都不会有任何差异。
查看这个StackBlitz示例。
这里我更新了你的StackBlitz:-)