SwitchMap由RXJS中的属性截然不同



假设我有一个动作流。每个操作都分配了一些ID。这样:

const actions$ = of({ id: 1 }, { id: 2 }, { id: 1 });

现在,对于每个动作,我想在switchmap中执行一些逻辑:

actions$.pipe(switchMap(a => /* some cancellable logic */)).subscribe(...);

问题在于,每个发射的动作都取消了先前的"一些可取消逻辑"。

是否可以根据操作 ID 取消"一些可取消逻辑",最好是操作员?类似:

actions$.pipe(switchMapBy('id', a => /*some cancellable logic */)).subscribe(...)

本质上,使用 switchmap 的当前行为:
1. 操作$ 发射ID#1。 switchmap 订阅可观察的嵌套。
2. 操作$ 发射ID#2。 switchmap 从上一个嵌套可观察到的无订阅。订阅新的。
3. 操作$ 发射ID#1。 switchmap 再次从先前的嵌套可观察到。订阅新的。

预期行为
1. 操作$ 发射ID#1。 switchmap 订阅可观察的嵌套。
2. 操作$ 发射ID#2。 switchmap 再次订阅可观察到的嵌套(这次使用#2(。这是区别,它不会从#1
取消一个3. 操作$ 发射ID#1。 switchmap 从#1观察的嵌套中取消订阅。再次订阅,#1。

这似乎是合并操作员的用例。SwitchMap的用例是仅维护一个内部订阅并取消以前的订阅,这不是您在这里追求的。您需要多个内部订阅,并希望当同一ID的新值通过时取消,因此实现一些自定义逻辑来执行此操作。

沿着:

的线条
action$.pipe(
  mergeMap(val => {
    return (/* your transform logic here */)
              .pipe(takeUntil(action$.pipe(filter(a => a.id === val.id)))); // cancel it when the same id comes back through, put this operator at the correct point in the chain
  })
)

您可以通过编写自定义操作员来将其转变为可重复的东西:

import { OperatorFunction, Observable, from } from 'rxjs';
import { takeUntil, filter, mergeMap } from 'rxjs/operators';
export function switchMapBy<T, R>(
  key: keyof T,
  mapFn: (val: T) => Observable<R> | Promise<R>
): OperatorFunction<T, R> {
  return input$ => input$.pipe(
    mergeMap(val => 
      from(mapFn(val)).pipe(
        takeUntil(input$.pipe(filter(i => i[key] === val[key])))
      )
    )
  );
}

并将其像:

action$.pipe(
  switchMapBy('id', (val) => /* your transform logic here */)
);

这是对此的闪电战:https://stackblitz.com/edit/rxjs-x1g4vc?file=index.ts

使用 filter 在switchmap之前操作以排除取消ID,例如此

of({ id: 1 }, { id: 2 }, { id: 1 }).pipe(
   filter(id => ![1,2].includes(id)), // to exclude ids (1,2)
   switchMap(id => /*some cancellable logic */ )
).subscribe(...)

最新更新