可观察的,在发出错误后继续发出事件



正如合同在这里所说的:http://reactivex.io/documentation/contract.html

当可观察量发出"已完成"或"错误"通知时, 可观察对象可以释放其资源并终止,并且其 观察者不应试图与它进一步沟通。

在此代码段中,我创建了一个即使在发出错误后也会发出事件的Observable,我的target函数允许从中创建另一个Observable,该在发生错误后不会停止侦听事件。

const { of, interval, concat, throwError } = rxjs;
const { switchMap, map, catchError } = rxjs.operators;
const source = interval(1000).pipe(
map(() => Math.floor(Math.random() * 7)),
switchMap(result => result < 6 ? of(result) : throwError(result)));
const target = source => source.pipe(catchError(err => concat(of('Error: ' + err), target(source))));
target(source).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js"></script>

此代码段的一个用例是创建一个自动完成功能,该自动完成功能在发生 HTTP 错误时不会停止工作。这是反模式吗?它会导致内存泄漏或调用堆栈永远增长吗?

据我所知,不,您只是传递可观察的源,因此订阅不会结束。

此外,源可观察量是定时可观察量,因此它不会阻塞主线程。但是,如果您将interval替换为可观察到的冷of(something)并且错误是可重复的,则您的主线程将冻结

有一个更干净的模式

source.pipe(catchError(err=>...of(err)),repeat())

演示:

const { of, interval, concat, throwError } = rxjs;
const { switchMap, map, repeat, catchError } = rxjs.operators;
const source = interval(1000).pipe(
map(() => Math.floor(Math.random() * 7)),
switchMap(result => result < 6 ? of(result) : throwError(result)),
catchError(err => of('Error: ' + err)),
repeat());
source.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js"></script>

捕获要处理的错误并将其切换到其他流是完全可以的。您还可以使用retry运算符来确保在 N 次尝试后最终会失败。

唯一的注意事项 - 一旦不需要它,请不要忘记取消订阅,它不会产生内存泄漏。

最好的情况是处理它更接近故障,然后你不需要返回原始流或使用repeat/retry运算符。

const { of, interval, concat, throwError } = rxjs;
const { switchMap, map, catchError } = rxjs.operators;
const source = searchTerm$.pipe(
debounce(250),
switchMap(term => this.http.get('/suggestion?=' + term).pipe(
catchError(() => EMPTY), // won't break the parent stream of terms.
),
);
source.subscribe(console.log); // will receive emits until unsubscribe.