rxjs withLatestFrom unExpected behavior


import { of, Subject, interval } from 'rxjs';
import {
tap,
startWith,
map,
delay,
publishReplay,
publish,
refCount,
withLatestFrom,
switchMap,
take
} from 'rxjs/operators';
const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publishReplay(),
refCount()
);
const c$ = b$.pipe(withLatestFrom(a$));
b$.subscribe(b => console.log(`b$`));
c$.subscribe(c => console.log(`c$`)); // my problem, why c$ not emit value
of(0).subscribe(a$);

我不知道为什么没有打印"c$"这是我的在线代码,https://stackblitz.com/edit/rxjs-pc5y8d?devtoolsheight=60&file=index.ts

TLDR

正在使用StackBlitz应用程序。


解释性解决方案

我将从一个有趣的观察开始:如果您对订阅b$的行(b$.subscribe(observer('b$'));(进行注释,则将执行c$观察器的next回调:

const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publish(),
refCount()
);
function observer(name: string) {
return {
next: (value: number) => {
console.log(`observer ${name}: ${value}`);
},
complete: () => {
console.log(`observer ${name}: complete`);
}
};
}
const c$ = b$.pipe(withLatestFrom(a$));
// b$.subscribe(observer('b$'));
c$.subscribe(observer('c$'));
of(0).subscribe(a$);
/*
Console output:
expensive calculation
observer c$: 0,0
observer c$: complete
*/

另一个观察结果是,如果您更改订阅者的顺序,也会调用相同的next回调:首先订阅c$,然后订阅b$:

const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publish(),
refCount()
);
function observer(name: string) {
return {
next: (value: number) => {
console.log(`observer ${name}: ${value}`);
},
complete: () => {
console.log(`observer ${name}: complete`);
}
};
}
const c$ = b$.pipe(
withLatestFrom(a$)
);
c$.subscribe(observer('c$'));
b$.subscribe(observer('b$'));
of(0).subscribe(a$);
/*
Console output:
expensive calculation
observer c$: 0,0
observer b$: 0
observer c$: complete
observer b$: complete
*/

我们将首先了解这些观察背后的原因,然后我们将提出解决方案。

首先,重要的是要知道Subject通过使用订阅者列表来跟踪其订阅者。当主题发出一个值(subject.next()(时,主题的所有订阅者都将根据他们订阅主题的顺序接收该值。

第一个例子中:

const a$ = new Subject();
const b$ = a$.pipe(/* ... */);
const c$ = b$/* subscriber 2 */.pipe(withLatestFrom(a$/* subscriber 1 */));
// b$.subscribe(observer('b$'));
c$.subscribe(observer('c$'));

订阅c$时,它将首先订阅传递给withLatestFrom的每个参数,然后订阅b$。但由于b$是基于a$,这意味着a$主题最终将有2个订阅者
为了使withLatestFrom发出值,必须同时满足两个条件:

  • 其源(在本例中为b$(必须发出一个值
  • 所有withLatestFrom参数(可观测(至少发出一次

在这种情况下,由于唯一的withLatestFrom的参数是首先订阅a$,因此将满足上述所有条件,这就是为什么observer c$: 0,0将打印到控制台的原因。

第二个例子中:

const a$ = new Subject();
const b$ = a$.pipe(/* ... */);
const c$ = b$.pipe(
withLatestFrom(a$)
);
c$.subscribe(observer('c$')); /* b$'s first subscription */
b$.subscribe(observer('b$')); /* b$'s second subscription */
of(0).subscribe(a$);

直到b$.subscribe(observer('b$'));,发生了与我上面描述的相同的事情。b$.subscribe(observer('b$'));所做的是将另一个订阅者添加到Subject实例中,但这次不是a$,而是属于publish()的主题。当第一次订阅发生时,publish将创建一个Subject实例,并将该新订阅者添加到其中,但它也将订阅源(这在内部发生(。同样,这种情况只发生在第一次订阅时。在随后的订阅中,订阅者将被添加到publish维护的主题中
a$主题也将有2个订阅者。一个来自withLatestFrom,一个来自于b$的第一次订阅。

因此,在这种情况下,控制台将输出:observer c$: 0,0,然后是observer b$: 0

最初的问题是observer c$: 0,0不会出现在控制台输出中。

const c$ = b$.pipe(withLatestFrom(a$) /* (3) */);
b$.subscribe(observer('b$') /* (1) */);
c$.subscribe(observer('c$') /* (2) */);

之所以会发生这种情况,是因为b$首先被订阅,这意味着a$将有它的第一个订户(这里是它发生的地方(。此外,publish的主题将是(1)。接下来,当c$被订阅时,(2)将成为publish的Subject的第二个订户。需要注意的是,a$不会再次被订阅。然而,a$将获得其第二个订户,即(3)。当a$发射时,第一个接收到该值的订户将是由b$引起的订户,因为withLatestFrom的第二个条件将不满足,因为它的源已经发射,但withLatestFrom的可观察器还没有发射任何东西。

一个解决方案是:

/* ... */
const c$ = b$.pipe(
delay(0),
withLatestFrom(a$)
);
b$.subscribe(observer('b$'));
c$.subscribe(observer('c$'));
of(0).subscribe(a$);

通过使用delay(0),,我们确保无论订阅者的顺序如何,withLatestFrom的observable都将是第一个接收值的。这是必要的,因为在这种情况下,b$的形式是a$.pipe(...),而withLatestFrom的自变量是a$。对于delay(0)withLatestFrom的可观测值将始终首先接收值。

作为旁注,也可以使用observeOn(asyncScheduler),而不是delay(0)。在这两种情况下,输出都是:

expensive calculation
observer b$: 0
observer b$: complete
observer c$: 0,0
observer c$: complete

withLatestFrom必须首先发出值

因此,如果您使用BehaviorSubject,您可以执行以下操作:

const _a = new BehaviorSubject<string>('Hello');
const a$ = _a.asObservable();
a$.subscribe();

它应该起作用。

感谢您的回答我还有一些问题:

const c$ = b$.pipe(withLatestFrom(a$) /* (3) */);
b$.subscribe(observer('b$') /* (1) */);
c$.subscribe(observer('c$') /* (2) */);

在这个例子中,我可以这样解释吗?

当b$被订阅时,它在b$的订户阵列中添加第一个订户

当c$被订阅时,它首先添加一个a$的订阅者,然后在b$的订阅者数组中添加第二个订阅者

当a$发出一个值时,它首先通知b$的订阅者数组,然后用LatestFrom可观察通知

当b$的第二个订阅者收到通知时,withLatestFrom observable没有发出值,因此它不会打印任何

@Andrei Gătej

相关内容

  • 没有找到相关文章

最新更新