可观察与多个订阅者执行一次



我有一段代码想定期执行,直到所有订阅者都取消订阅。

// This function shall be called *once* per tick,
// no matter the quantity of subscriber.
function doSomething(val) {
console.log("doing something");
return val;
}
observable = Rx.Observable.timer(0, 1000).map(val => doSomething(val));
const first = observable.subscribe(val => console.log("first:", val));
const second = observable.subscribe(val => console.log("second:", val));
// After 1.5 seconds, stop first.
Rx.Observable.timer(1500).subscribe(_ => first.unsubscribe());
// After 2.5 seconds, stop second.
Rx.Observable.timer(2500).subscribe(_ => second.unsubscribe());

JSFiddle

我的预期输出如下所示:

doing something
first: 0
second: 0
doing something
first: 1
second: 1
doing something
second: 2
<nothing more>

但是,当调用两个可观察量时,doSomething函数被调用两次。以下是实际输出:

doing something
first: 0
doing something
second: 0
doing something
first: 1
doing something
second: 1
doing something
second: 2
<nothing more>

我是否做设计错误?有没有办法做到这一点?

你看到的行为是正确的。interval返回的可观察量是冷的。也就是说,在观察者订阅之前不会创建计时器,当观察者订阅时,创建的计时器是专门为该订阅创建的。

您可以使用share运算符实现您期望的行为:

observable = Rx.Observable
.timer(0, 1000)
.map(val => doSomething(val))
.share();

share运算符引用对订阅进行计数,并对多个订阅者可观察到的源进行多播 - 因此只有一个间隔/计时器,在两个订阅者之间共享。

有关详细信息,您可能会发现本文很有用。

实时工作示例。 然后你需要使用Subjects.普通可观察量是单播的(这意味着每个订阅者都拥有Observable的独立执行(。这就是每个观察者被称为你拥有的所有执行链的方式。

observable = Rx.Observable.timer(0, 1000)
.map(val => doSomething(val));

map正在为每个观察者调用。

主题是一种特殊类型的可观察量,允许值进行多播,这意味着您共享可观察量的单行执行。这是一个rxjs6,如果你迷路了管道操作员,请看这里。

首先,获取imports

import { Observable, Subject, timer } from 'rxjs';
import { map, share } from 'rxjs/operators';

然后你有,

const subject = new Subject();
const doSomething = val => {
console.log("doing something");
return val;
}
const observable = timer(0, 1000).pipe(
map(val => doSomething(val)),
).pipe(share());
const first = observable.subscribe(val => console.log("first:", val));
const second = observable.subscribe(val => console.log("second:", val));
const tercer = observable.subscribe(val => console.log("tercer:", val));
// After 1.5 seconds, stop first.
timer(1500).subscribe(_ => first.unsubscribe());
// After 2.5 seconds, stop second.
timer(2500).subscribe(_ => second.unsubscribe());
// After 2.5 seconds, stop second.
timer(2500).subscribe(_ => tercer.unsubscribe());

最新更新