import { of, Subject, interval } from 'rxjs';
import {
tap,
startWith,
map,
delay,
publishReplay,
publish,
refCount,
withLatestFrom,
switchMap,
take
} from 'rxjs/operators';
const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publishReplay(),
refCount()
);
const c$ = b$.pipe(withLatestFrom(a$));
b$.subscribe(b => console.log(`b$`));
c$.subscribe(c => console.log(`c$`)); // my problem, why c$ not emit value
of(0).subscribe(a$);
我不知道为什么没有打印"c$"这是我的在线代码,https://stackblitz.com/edit/rxjs-pc5y8d?devtoolsheight=60&file=index.ts
TLDR
正在使用StackBlitz应用程序。
解释性解决方案
我将从一个有趣的观察开始:如果您对订阅b$
的行(b$.subscribe(observer('b$'));
(进行注释,则将执行c$
观察器的next
回调:
const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publish(),
refCount()
);
function observer(name: string) {
return {
next: (value: number) => {
console.log(`observer ${name}: ${value}`);
},
complete: () => {
console.log(`observer ${name}: complete`);
}
};
}
const c$ = b$.pipe(withLatestFrom(a$));
// b$.subscribe(observer('b$'));
c$.subscribe(observer('c$'));
of(0).subscribe(a$);
/*
Console output:
expensive calculation
observer c$: 0,0
observer c$: complete
*/
另一个观察结果是,如果您更改订阅者的顺序,也会调用相同的next
回调:首先订阅c$
,然后订阅b$
:
const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publish(),
refCount()
);
function observer(name: string) {
return {
next: (value: number) => {
console.log(`observer ${name}: ${value}`);
},
complete: () => {
console.log(`observer ${name}: complete`);
}
};
}
const c$ = b$.pipe(
withLatestFrom(a$)
);
c$.subscribe(observer('c$'));
b$.subscribe(observer('b$'));
of(0).subscribe(a$);
/*
Console output:
expensive calculation
observer c$: 0,0
observer b$: 0
observer c$: complete
observer b$: complete
*/
我们将首先了解这些观察背后的原因,然后我们将提出解决方案。
首先,重要的是要知道Subject
通过使用订阅者列表来跟踪其订阅者。当主题发出一个值(subject.next()
(时,主题的所有订阅者都将根据他们订阅主题的顺序接收该值。
在第一个例子中:
const a$ = new Subject();
const b$ = a$.pipe(/* ... */);
const c$ = b$/* subscriber 2 */.pipe(withLatestFrom(a$/* subscriber 1 */));
// b$.subscribe(observer('b$'));
c$.subscribe(observer('c$'));
订阅c$
时,它将首先订阅传递给withLatestFrom
的每个参数,然后订阅b$
。但由于b$
是基于a$
的,这意味着a$
主题最终将有2个订阅者
为了使withLatestFrom
发出值,必须同时满足两个条件:
- 其源(在本例中为
b$
(必须发出一个值 - 所有的
withLatestFrom
参数(可观测(至少发出一次
在这种情况下,由于唯一的withLatestFrom
的参数是首先订阅到a$
,因此将满足上述所有条件,这就是为什么observer c$: 0,0
将打印到控制台的原因。
在第二个例子中:
const a$ = new Subject();
const b$ = a$.pipe(/* ... */);
const c$ = b$.pipe(
withLatestFrom(a$)
);
c$.subscribe(observer('c$')); /* b$'s first subscription */
b$.subscribe(observer('b$')); /* b$'s second subscription */
of(0).subscribe(a$);
直到b$.subscribe(observer('b$'));
,发生了与我上面描述的相同的事情。b$.subscribe(observer('b$'));
所做的是将另一个订阅者添加到Subject实例中,但这次不是a$
,而是属于publish()
的主题。当第一次订阅发生时,publish
将创建一个Subject实例,并将该新订阅者添加到其中,但它也将订阅源(这在内部发生(。同样,这种情况只发生在第一次订阅时。在随后的订阅中,订阅者将被添加到publish
维护的主题中a$
主题也将有2个订阅者。一个来自withLatestFrom
,一个来自于b$
的第一次订阅。
因此,在这种情况下,控制台将输出:observer c$: 0,0
,然后是observer b$: 0
。
最初的问题是observer c$: 0,0
不会出现在控制台输出中。
const c$ = b$.pipe(withLatestFrom(a$) /* (3) */);
b$.subscribe(observer('b$') /* (1) */);
c$.subscribe(observer('c$') /* (2) */);
之所以会发生这种情况,是因为b$
首先被订阅,这意味着a$
将有它的第一个订户(这里是它发生的地方(。此外,publish
的主题将是(1)
。接下来,当c$
被订阅时,(2)
将成为publish
的Subject的第二个订户。需要注意的是,a$
不会再次被订阅。然而,a$
将获得其第二个订户,即(3)
。当a$
发射时,第一个接收到该值的订户将是由b$
引起的订户,因为withLatestFrom
的第二个条件将不满足,因为它的源已经发射,但withLatestFrom
的可观察器还没有发射任何东西。
一个解决方案是:
/* ... */
const c$ = b$.pipe(
delay(0),
withLatestFrom(a$)
);
b$.subscribe(observer('b$'));
c$.subscribe(observer('c$'));
of(0).subscribe(a$);
通过使用delay(0),
,我们确保无论订阅者的顺序如何,withLatestFrom
的observable都将是第一个接收值的。这是必要的,因为在这种情况下,b$
的形式是a$.pipe(...)
,而withLatestFrom
的自变量是a$
。对于delay(0)
,withLatestFrom
的可观测值将始终首先接收值。
作为旁注,也可以使用observeOn(asyncScheduler)
,而不是delay(0)
。在这两种情况下,输出都是:
expensive calculation
observer b$: 0
observer b$: complete
observer c$: 0,0
observer c$: complete
withLatestFrom
必须首先发出值
因此,如果您使用BehaviorSubject,您可以执行以下操作:
const _a = new BehaviorSubject<string>('Hello');
const a$ = _a.asObservable();
a$.subscribe();
它应该起作用。
感谢您的回答我还有一些问题:
const c$ = b$.pipe(withLatestFrom(a$) /* (3) */);
b$.subscribe(observer('b$') /* (1) */);
c$.subscribe(observer('c$') /* (2) */);
在这个例子中,我可以这样解释吗?
当b$被订阅时,它在b$的订户阵列中添加第一个订户
当c$被订阅时,它首先添加一个a$的订阅者,然后在b$的订阅者数组中添加第二个订阅者
当a$发出一个值时,它首先通知b$的订阅者数组,然后用LatestFrom可观察通知
当b$的第二个订阅者收到通知时,withLatestFrom observable没有发出值,因此它不会打印任何
@Andrei Gătej