我正在使用RxJS 6,我有以下示例问题:
我们希望为指定的bufferTime
缓冲元素,但如果在大于bufferTime
的某段时间内没有发生任何事情,我们希望第一个元素立即激发。
顺序:
[------bufferTime------]
Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]
Output over time:
[1]-----------------[2,3]---[4]------------------[5,6]
这是让我到达那里的代码:
source$.pipe(
buffer(source$.pipe(
throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
delay(10) // <-- This here bugs me like crazy though!
)
)
我的问题是关于delay
运算符。当我省略它时,缓冲区会用一个空列表触发,因为$source.pipe(throttleTime(...))
比缓冲区的步长快。
无delay
[------bufferTime------]
Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]
Output over time:
[]------------------[1,2,3]--[]------------------[4,5,6]
有没有办法摆脱delay
?
问题是,当出现新值时,观察者订阅流的顺序会被保留:第一个订阅的人将首先接收值,然后接收下一个值,依此类推
在v6中,buffer
将首先订阅内部流,内部流将首先订阅source$
,然后它(缓冲区(将订阅第二位的source$
。为了澄清,当一个新值出现时,第一个接收到新值的将是内部可观测值,因为它在buffer
之前订阅了源。
在v7中,它被换掉了,所以它应该在没有任何额外工作的情况下工作
作为更清洁的v6解决方案,有一个运营商可以帮助您:subscribeOn
。有了这个,您可以指定订阅运营商时使用的调度程序,在您的情况下,通过使用asapScheduler
的微任务,它应该可以工作:
source$.pipe(
buffer(source$.pipe(
subscribeOn(asapScheduler),
throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
)
)
这样,在缓冲区订阅后,对源的内部订阅将被推迟。
请注意,如果您使用此实现创建自定义运算符,您可能希望使用重载publish(multicasted$ => ...)
来多播源$,否则此代码段将对source$
进行2次订阅(可能会发送两次请求?(
尝试以下代码,您可以在StackBlitz中使用它:
const source$: Observable<string> = interval(300).pipe(
map(x => "a" + x),
share()
);
source$.pipe(
exhaustMap(value => {
return concat(
of([value]),
source$.pipe(
bufferTime(1000),
take(1)
)
);
})
).subscribe(x => console.log(x, new Date()));
输出为:
["a0"] 2020-09-21T14:39:49.895Z
["a1", "a2", "a3"] 2020-09-21T14:39:50.899Z
["a4"] 2020-09-21T14:39:51.096Z
["a5", "a6", "a7"] 2020-09-21T14:40:30.967Z