我有一个奇怪的用例,需要跟踪以前发出的所有事件。
多亏了ReplaySubject,它到目前为止运行得很好。在每一个新订阅者上,此主题都会重新发出以前的每一个事件。
现在,对于一个特定的场景,我需要能够只给出最新发布的事件(有点像BehaviorSubject(,但保持源事件相同。
以下是我试图实现的一个片段:stackblitz
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.mySubject = new ReplaySubject();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, complete) {
return this.mySubject.subscribe(next, error, complete);
}
subscribe(next, error, complete) {
return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, complete);
}
}
const myEventManager = new EventManager();
myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");
myEventManager.fullSubscribe(v => {
console.log("SUB 1", v);
});
myEventManager.subscribe(v => {
console.log("SUB 2", v);
});
谢谢
如果跟踪已发布的事件数量,则可以使用skip
:
subscribe(next, error?, complete?) {
return this.mySubject.pipe(
skip(this.publishCount - 1)
).subscribe(next, error, complete);
}
这是StackBlitz演示。
您可以通过操纵BehaviorSubject
来实现类似ReplaySubject
的行为,而不是强制ReplaySubject
表现得像BehaviorSubject
。
import { BehaviorSubject, from, concat } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';
class EventManager {
constructor() {
this.mySubject = new BehaviorSubject();
this.allEmittedValues = this.mySubject.pipe(
scan((xs, x) => [...xs, x], []),
shareReplay(1)
);
// Necessary since we need to start accumulating allEmittedValues
// immediately.
this.allEmittedValues.subscribe();
}
dispose() {
// ends all subscriptions
this.mySubject.complete();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, complete) {
// First, take the latest value of the accumulated array of emits and
// unroll it into an observable
const existingEmits$ = this.allEmittedValues.pipe(
take(1),
concatMap((emits) => from(emits))
);
// Then, subscribe to the main subject, skipping the replayed value since
// we just got it at the tail end of existingEmits$
const futureEmits$ = this.mySubject.pipe(skip(1));
return concat(existingEmits$, futureEmits$).subscribe(
next,
error,
complete
);
}
subscribe(next, error, complete) {
return this.mySubject.subscribe(next, error, complete);
}
}
为什么不在EventManager
上有一个ReplaySubject
和BehaviorSubject
的实例?
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.replaySubject = new ReplaySubject();
this.behaviorSubject = new BehaviorSubject();
}
publish(value) {
this.replaySubject.next(value);
this.behaviorSubject.next(value);
}
fullSubscribe(next, error, complete) {
return this.replaySubject.subscribe(next, error, complete);
}
subscribe(next, error, complete) {
return this.behaviorSubject.subscribe(next, error, complete);
}
}