我现在有了一个类型:
export interface IClickHandler {
index: number; // call order starting from 0
handleClick$(event: any): Observable<boolean>;
}
在handleClick$出现之前是这样的:
handleClick(event: any): boolean;
在每个处理程序中,我创建了一个新方法handleClick$,它调用旧的handleClick:
public handleClick$(evt: any): Observable<boolean> {
return new Observable(subscriber => {
const isHandled = this.handleClick(evt);
subscriber.next(isHandled);
subscriber.complete();
});
}
在我的组件中,我有一个数组字段和如下所示的方法:
private handlers: IClickHandler[] = [];
我有执行每个处理程序的方法。handleClick,直到返回true。
public handleControlClick(event: any) {
// these are ordered in array to ensure call order
for (const handler of this.handlers) {
if (handler.handleClick(event)) {
break;
}
}
}
现在我需要把它改成:
boolean -> Observable<boolean>
改变。这是如何做到的呢?
你可以尝试创建一个所有可观察对象的数组,然后使用merge
,你会得到一个可观察对象,每当每个可观察对象发出一个值时,使用takeWhile
,一旦它变成假,就取消订阅,就像这样
import { takeWhile, interval, tap, merge } from 'rxjs';
const s1 = interval(1000);
const s2 = interval(2000);
const s3 = interval(3000);
const arr = [s1, s2, s3];
const stoppingValue = 7;
const c = merge(...arr).pipe(
tap((value) => console.log(value)),
takeWhile((value) => value !== stoppingValue),
).subscribe();
一种选择是利用concat
和every
组合来连接可观察对象和短路,如果其中一个条件失败(在这种情况下,如果其中一个可观察对象返回true
):
const handlerObjects: IClickHandler[] = [
{
index: 0,
handleClick$: of(false)
},
{
index: 1,
handleClick$: of(false)
},
{
index: 2,
handleClick$: of(true)
},
{
index: 3,
handleClick$: of(false)
},
];
const handlers: Observable<boolean>[] =
handlerObjects
.map(
(handlerObject, index) => handlerObject.handleClick$
// Unnecessary pipe, just for index tracking
.pipe(tap(() => {console.log(`${index} called`)
);
const concatHandlers$: Observable<boolean> = concat(...handlers).pipe(every((value) => !value))
concatHandlers$.subscribe();
// 0 called
// 1 called
// 2 called
// Note: 3 not called due to shortcircuit on the 3rd item returning true
我从评论
我想让第一个可观察对象发出,然后如果发出的值为false下一个,直到某个发出true。然后跳过其余部分。
所以,我理解你想运行所有的处理程序一个Observable发出false
,之后没有其他处理程序必须执行。
如果这种理解是正确的,那么一种方法可能是
// create an Observable which is the merge of all Observables
// use share to make sure that the derived Observables all share the same
// upstream
const mergedObs$ = merge(...arr).pipe(share());
// stop$ emits when the first value is true and then completes since we have take(1)
const stop$ = mergedObs$.pipe(
filter((isTrue) => isTrue),
take(1)
);
// the final Observable emits until stop$ emits
mergedObs$
.pipe(
takeUntil(stop$)
)
.subscribe((value) => console.log(value));