计数、间隔和事件上的反应式扩展缓冲区



我想缓冲发送到服务器的事件。刷新缓冲区的触发器是缓冲区大小已达到、缓冲区周期已达到或窗口已卸载。

我通过创建Subject并使用带有关闭通知程序的buffer来缓冲发送到服务器的事件。我使用race作为关闭通知程序,并使用window.beforeunload事件竞争缓冲期。

this.event$ = new Subject();
this.bufferedEvent$ = this.event$
.buffer(
Observable.race(
Observable.interval(bufferPeriodMs),
Observable.fromEvent(window, 'beforeunload')
)
)
.filter(events => events.length > 0)
.switchMap(events =>
ajax.post(
this.baseUrl + RESOURCE_URL,
{
entries: events,
},
{
'Content-Type': 'application/json',
}
)
);

问题是,我现在如何限制缓冲区的大小。即,当缓冲区有10个项目时,我从不希望它被刷新。

这是我的工作解决方案。添加了额外的console.log()来显示事件的顺序。

唯一有点麻烦的是fullBufferTrigger中的.skip(1),但它是必需的,因为它将在缓冲区满时触发(natch),但bufferedEvent$中的缓冲区在触发之前似乎没有最新的事件。

幸运的是,有了timeoutTrigger,最后一个事件就会发出。如果没有超时,fullBufferTrigger本身将不会发出最终事件。

此外,将buffer更改为bufferWhen,因为前者似乎没有用两个触发器触发,尽管您可以从文档中看到它
脚注对于buffer(race()),比赛只完成一次,因此先到达那里的触发器将被使用,其他触发器将被忽略。相反,bufferWhen(x => race())在每次事件发生时都会进行评估。

const bufferPeriodMs = 1000
const event$ = new Subject()
event$.subscribe(event => console.log('event$ emit', event))
// Define triggers here for testing individually
const beforeunloadTrigger = Observable.fromEvent(window, 'beforeunload')
const fullBufferTrigger = event$.skip(1).bufferCount(2)
const timeoutTrigger = Observable.interval(bufferPeriodMs).take(10)
const bufferedEvent$ = event$
.bufferWhen( x => 
Observable.race(
fullBufferTrigger,
timeoutTrigger
)
)
.filter(events => events.length > 0)
// output
fullBufferTrigger.subscribe(x => console.log('fullBufferTrigger', x))
timeoutTrigger.subscribe(x => console.log('timeoutTrigger', x))
bufferedEvent$.subscribe(events => {
console.log('subscription', events)
})
// Test sequence
const delayBy = n => (bufferPeriodMs * n) + 500 
event$.next('event1')
event$.next('event2')
event$.next('event3')
setTimeout( () => {
event$.next('event4')
}, delayBy(1))
setTimeout( () => {
event$.next('event5')
}, delayBy(2))
setTimeout( () => {
event$.next('event6')
event$.next('event7')
}, delayBy(3))

工作示例:CodePen

编辑:触发缓冲区的另一种方式

由于bufferWhenrace的组合可能有点低效(每次事件发射都会重新开始比赛),因此另一种选择是将触发器合并为一个流,并使用简单的buffer

const bufferTrigger$ = timeoutTrigger
.merge(fullBufferTrigger)
.merge(beforeunloadTrigger)
const bufferedEvent$ = event$
.buffer(bufferTrigger$)
.filter(events => events.length > 0)

使用独立触发器的解决方案让我感到困扰的一件事是,fullBufferTrigger不知道timeoutTrigger何时发出了它的缓冲值之一,所以如果事件序列正确,fullBuffer将在超时后提前触发。

理想情况下,当timeoutTrigger触发时,我们希望fullBufferTrigger重置,但这很难做到

使用bufferTime()

在RxJS v4中有一个运算符bufferWithTimeOrCount(timeSpan, count, [scheduler]),在RxJSv5中,它被汇总为bufferTime()的附加签名(从清晰的角度来看,可以说是一个错误)。

bufferTime<T>(
bufferTimeSpan: number, 
bufferCreationInterval: number, 
maxBufferSize: number, 
scheduler?: IScheduler
): OperatorFunction<T, T[]>;

剩下的唯一问题是如何合并window.beforeunload触发器。查看bufferTime的源代码,它应该在接收onComplete时刷新缓冲区
因此,我们可以通过向缓冲的事件流发送onComplete来处理window.beforeunload

bufferTime的规范没有对onComplete进行明确的测试,但我想我已经成功地将其整合在一起了。

注:

  • 超时设置得很大,以便将其从测试图片中删除
  • 源事件流不受影响,例如,添加了event8,但从不发出,因为窗口在发生之前就被破坏了
  • 要查看输出流而不带beforeunloadTrigger,请注释掉发出onComplete的行。Event7在缓冲区中,但不会发出

测试:

const bufferPeriodMs = 7000  // Set high for this test
const bufferSize = 2
const event$ = new Rx.Subject()
/*
Create bufferedEvent$
*/
const bufferedEvent$ = event$
.bufferTime(bufferPeriodMs, null, bufferSize)
.filter(events => events.length > 0)
const subscription = bufferedEvent$.subscribe(console.log)  
/*
Simulate window destroy
*/
const destroy = setTimeout( () => {
subscription.unsubscribe()
}, 4500)
/*
Simulate Observable.fromEvent(window, 'beforeunload')
*/
const beforeunloadTrigger = new Rx.Subject()
// Comment out the following line, observe that event7 does not emit
beforeunloadTrigger.subscribe(x=> event$.complete())
setTimeout( () => {
beforeunloadTrigger.next('unload')
}, 4400)
/*
Test sequence
Event stream:        '(123)---(45)---6---7-----8--|'
Destroy window:      '-----------------------x'
window.beforeunload: '---------------------y'
Buffered output:     '(12)---(34)---(56)---7'
*/
event$.next('event1')
event$.next('event2')
event$.next('event3')
setTimeout( () => { event$.next('event4'); event$.next('event5') }, 1000)
setTimeout( () => { event$.next('event6') }, 3000)
setTimeout( () => { event$.next('event7') }, 4000)
setTimeout( () => { event$.next('event8') }, 5000)

工作示例:CodePen

最新更新