如何迭代可观察数组<boolean>并在可观察量发出某个值时中断?



我现在有了一个类型:

export interface IClickHandler {
index: number; // call order starting from 0
handleClick$(event: any): Observable<boolean>;
}

在handleClick$出现之前是这样的:

handleClick(event: any): boolean;

在每个处理程序中,我创建了一个新方法handleClick$,它调用旧的handleClick:

public handleClick$(evt: any): Observable<boolean> {
return new Observable(subscriber => {
const isHandled = this.handleClick(evt);
subscriber.next(isHandled);
subscriber.complete();
});
}

在我的组件中,我有一个数组字段和如下所示的方法:

private handlers: IClickHandler[] = [];

我有执行每个处理程序的方法。handleClick,直到返回true。

public handleControlClick(event: any) {
// these are ordered in array to ensure call order
for (const handler of this.handlers) {
if (handler.handleClick(event)) {
break;
}
}
}

现在我需要把它改成:

boolean -> Observable<boolean>

改变。这是如何做到的呢?

你可以尝试创建一个所有可观察对象的数组,然后使用merge,你会得到一个可观察对象,每当每个可观察对象发出一个值时,使用takeWhile,一旦它变成假,就取消订阅,就像这样

import { takeWhile, interval, tap, merge } from 'rxjs';
const s1 = interval(1000);
const s2 = interval(2000);
const s3 = interval(3000);
const arr = [s1, s2, s3];
const stoppingValue = 7;
const c = merge(...arr).pipe(
tap((value) => console.log(value)),
takeWhile((value) => value !== stoppingValue),
).subscribe();

一种选择是利用concatevery组合来连接可观察对象和短路,如果其中一个条件失败(在这种情况下,如果其中一个可观察对象返回true):

设置>
const handlerObjects: IClickHandler[] = [
{
index: 0,
handleClick$: of(false)
},
{
index: 1,
handleClick$: of(false)
},
{
index: 2,
handleClick$: of(true)
},
{
index: 3,
handleClick$: of(false)
},
];

const handlers: Observable<boolean>[] =
handlerObjects
.map(
(handlerObject, index) => handlerObject.handleClick$
// Unnecessary pipe, just for index tracking
.pipe(tap(() => {console.log(`${index} called`)
);
const concatHandlers$: Observable<boolean> = concat(...handlers).pipe(every((value) => !value))
concatHandlers$.subscribe();
// 0 called
// 1 called
// 2 called
// Note: 3 not called due to shortcircuit on the 3rd item returning true

我从评论

我想让第一个可观察对象发出,然后如果发出的值为false下一个,直到某个发出true。然后跳过其余部分。

所以,我理解你想运行所有的处理程序一个Observable发出false,之后没有其他处理程序必须执行。

如果这种理解是正确的,那么一种方法可能是

// create an Observable which is the merge of all Observables
// use share to make sure that the derived Observables all share the same
// upstream
const mergedObs$ = merge(...arr).pipe(share());
// stop$ emits when the first value is true and then completes since we have take(1)
const stop$ = mergedObs$.pipe(
filter((isTrue) => isTrue),
take(1)
);
// the final Observable emits until stop$ emits
mergedObs$
.pipe(
takeUntil(stop$)
)
.subscribe((value) => console.log(value));

最新更新