主体发射后的 RxJS 执行顺序



我想了解对主题进行"下一个"调用后的代码执行顺序。

背景:我有3个类(称它们为HaveSubject,HaveSubscription1,HaveSubscription2)。HaveSubject 需要告诉 HS1 和 HS2 通过 HS1 和 HS2 订阅的主题执行某些操作。他们的任务必须在HaveSubject继续执行方法之前完成。

伪代码:

class HaveSubject {
// Angular service
public mySubject$: Subject<string> = new Subject<string>();
public codeImExecuting() {
this.mySubject$.next('input for tasks')
this.somethingVeryImportant();
}
private somethingVeryImportant() {
// stuff
}
}
class HaveSubscription1 {
// Angular service
constructor(private hs: HaveSubject) {
this.hs.mySubject$.subscribe(inputStr => {
// did I do this before somethingVeryImportant is called?
});
}
}
class HaveSubscription2 {
// Angular service
constructor(private hs: HaveSubject) {
this.hs.mySubject$.subscribe(inputStr => {
// did I do this before somethingVeryImportant is called?
});
}
}

我的问题是:在继续执行方法之前,确保 HS1 和 HS2 执行附加到其订阅的代码的最佳方法是什么非常重要?如果操作顺序是:HaveSubject 在主题上调用"next"——> HS1 和 HS2 完成它们的任务——> HaveSubject 继续它的下一行代码,这是非常重要的,那么我没有问题。我只是不确定订阅在收到订阅中的"下一个"项后是否会立即执行。

注意:有一些事情我通常不能做,例如将 HaveSubject 注入到其他两个中,因为其他两个是动态创建的(即我可能没有、一个或两个 HaveSubscriptionX,不清楚将创建多少个,这些是由组件提供的 Angular 服务, 不在根...

思潮?

HaveSubscription#中完成边工作的最简单的调用事件(不理想,很多重复运行检查第二个选项)

class HaveSubject {
// Angular service
public mySubject$: Subject<string> = new Subject<string>();
public mySubjectDone$: Subject<void> = new Subject<void>();
public constructor() {
this.mySubjectDone$.subscribe(this.somethingVeryImportant.bind(this));
}
public codeImExecuting() {
this.mySubject$.next('input for tasks')
}
private somethingVeryImportant() {
// stuff
}
}
class HaveSubscription1 {
// Angular service
constructor(private hs: HaveSubject) {
this.hs.mySubject$.subscribe(inputStr => {
// did I do this before somethingVeryImportant is called?
hs.mySubjectDone$.next()
});
}
}
class HaveSubscription2 {
// Angular service
constructor(private hs: HaveSubject) {
this.hs.mySubject$.subscribe(inputStr => {
// did I do this before somethingVeryImportant is called?
hs.mySubjectDone$.next()
});
}
}

或者,如果您不想要求HaveSubscription#触发延迟操作的任何操作

class HaveSubject {
// Angular service
public mySubject$: Subject<string> = new Subject<string>();
public constructor() {
this.mySubject$.pipe(
debounceTime(16) // delay or auditTime, debounceTime, ...
).subscribe(this.somethingVeryImportant.bind(this));
}
public codeImExecuting() {
this.mySubject$.next('input for tasks')
}
private somethingVeryImportant() {
// stuff
}
}
class HaveSubscription1 {
// Angular service
constructor(private hs: HaveSubject) {
this.hs.mySubject$.subscribe(inputStr => {
// did I do this before somethingVeryImportant is called?
});
}
}
class HaveSubscription2 {
// Angular service
constructor(private hs: HaveSubject) {
this.hs.mySubject$.subscribe(inputStr => {
// did I do this before somethingVeryImportant is called?
});
}
}

如果你有一些版本控制机制会对HaveSubscription#中所做的更改做出反应,你可以这样做:

this.mySubject$.pipe(
map(this.calculateVersion),
distinctUntilChanged(),
).subscribe(this.somethingVeryImportant.bind(this));

所以这看起来你已经做出了一些可疑的架构决策,因为流程是

  1. 服务 1 函数执行并从可观察量发出
  2. 服务
  3. 2/3/etc 订阅者到服务 1 可观察的执行代码,产生一些输出或副作用
  4. 服务
  5. 1 函数需要根据服务 2/3/etc 函数的某些输出或副作用执行

这要求主体意识到它的观察者,这与 RXJS 哲学相反。这里理想的解决方案是解决这些架构问题。如果不更多地了解事情如何或为什么以这种方式发生或总体目标是什么,就很难说如何实现这一目标。

但是,你可以以一种非常可靠的方式完成这一点,你需要在你的第一个函数上添加一些主题,这些主题可以指示依赖服务的完成:

class HaveSubject {
// Angular service
public mySubject$: Subject<string> = new Subject<string>();
private workDone$: Subject<void> = new Subject<void>();
imDone() {
this.workDone$.next();
}
public codeImExecuting() {
if (this.mySubject$.observers.length) { // check for observers
// if observers, listen for that many emissions from workDone$, only reacting to the last one
this.workDone$.pipe(take(this.mySubject$.observers.length), last())
.subscribe(v => this.somethingVeryImportant());
} else { // or just run the function, doesn't matter
this.somethingVeryImportant();
}
this.mySubject$.next('input for tasks');
}
private somethingVeryImportant() {
// stuff
}
}

并让 mySubject$ 的观察者在完成后调用 imDone()。

最新更新