RxJS:为什么内部可观察到的发射是第一位的



我正在使用RxJS 6,我有以下示例问题:

我们希望为指定的bufferTime缓冲元素,但如果在大于bufferTime的某段时间内没有发生任何事情,我们希望第一个元素立即激发。

顺序:

[------bufferTime------]
Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]

Output over time:
[1]-----------------[2,3]---[4]------------------[5,6]

这是让我到达那里的代码:

source$.pipe(
buffer(source$.pipe(
throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
delay(10) // <-- This here bugs me like crazy though!
)
)

我的问题是关于delay运算符。当我省略它时,缓冲区会用一个空列表触发,因为$source.pipe(throttleTime(...))比缓冲区的步长快。

delay

[------bufferTime------]
Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]

Output over time:
[]------------------[1,2,3]--[]------------------[4,5,6]

有没有办法摆脱delay

问题是,当出现新值时,观察者订阅流的顺序会被保留:第一个订阅的人将首先接收值,然后接收下一个值,依此类推

在v6中,buffer将首先订阅内部流,内部流将首先订阅source$,然后它(缓冲区(将订阅第二位的source$。为了澄清,当一个新值出现时,第一个接收到新值的将是内部可观测值,因为它在buffer之前订阅了源。

在v7中,它被换掉了,所以它应该在没有任何额外工作的情况下工作

作为更清洁的v6解决方案,有一个运营商可以帮助您:subscribeOn。有了这个,您可以指定订阅运营商时使用的调度程序,在您的情况下,通过使用asapScheduler的微任务,它应该可以工作:

source$.pipe(
buffer(source$.pipe(
subscribeOn(asapScheduler),
throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
)
)

这样,在缓冲区订阅后,对源的内部订阅将被推迟

请注意,如果您使用此实现创建自定义运算符,您可能希望使用重载publish(multicasted$ => ...)来多播源$,否则此代码段将对source$进行2次订阅(可能会发送两次请求?(

尝试以下代码,您可以在StackBlitz中使用它:

const source$: Observable<string> = interval(300).pipe(
map(x => "a" + x),
share()
);
source$.pipe(
exhaustMap(value => {
return concat(
of([value]),
source$.pipe(
bufferTime(1000),
take(1)
)
);
})
).subscribe(x => console.log(x, new Date()));

输出为:

["a0"] 2020-09-21T14:39:49.895Z

["a1", "a2", "a3"] 2020-09-21T14:39:50.899Z

["a4"] 2020-09-21T14:39:51.096Z

["a5", "a6", "a7"] 2020-09-21T14:40:30.967Z

最新更新