rxjs:向可观察流动态添加值的最佳方式是什么



作为学习rxjs的一部分,我一直在使用、from、interval等的创建方法来测试节流阀和减压阀等。

现在我有了一个真实的用例,我需要将值动态添加到一个空的可观察流中。如果不使用上面的创建方法,我找不到任何关于如何最好地做到这一点的例子。目前,我正在使用BehaviourSubject使用next((将项目动态添加到流中。这是向流动态添加新项目的最佳/首选方式吗?

例如

import { BehaviorSubject, timer } from 'rxjs';
import { tap, mapTo, concatMap, } from 'rxjs/operators';
const subject = new BehaviorSubject(1);
const example = subject.pipe(
concatMap(ev => timer(200).pipe(mapTo(ev))),
tap((ev) => console.log(ev))
)
example.subscribe();
// add a flurry of values dynamically
subject.next(2);
subject.next(3);
subject.next(4);
// some time later add some more
setTimeout(function(){ 
subject.next(5);
subject.next(6);
subject.next(7);
}, 5000);

https://stackblitz.com/edit/rxjs-behaviorsubject-simpleexample-gyrtw8?file=index.ts

感谢

如果您有一个添加应在Observable中发出的值的强大自定义逻辑,您可以创建自己的(而不是使用fromEvent, of, from, ...(:


const myObservable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.next(5);

setTimeout(() => {
subscriber.next(6);
}, 2000);
}, 1000);
});

然而,rxjs的创建函数应该能满足99%的需求。上面的代码也可以写成:

concat(
of(1,2,3),
of(4,5).pipe(
delay(1000)
),
of(6).pipe(
delay(2000)
)
)

UPD:关于受试者

Subject也是一个Observable,所以在您的情况下,使用Subject是适用的,但可能不是最好的选择。主题的想法是可以有不止一个订阅者(使用主题价值观的订阅者(,但我不确定这是否是你的情况(顺便说一句,你可以提供你的真实例子来帮助我们了解你想要实现的目标(

最新更新