RXJS扫描 - 从另一个可观察到的扫描



我正在尝试弄清楚如何在我的输入可观察的发出新值时使用扫描来得出新状态,但我似乎无法使其正常工作。

我想每次输入$ observable发出新的值时输出一个新状态,但应从状态$的当前值派生。

谁能建议我如何解决此问题?我觉得我完全有一个错误的想法: - )

我的代码看起来像这样:

const stateReducer = (state$: Observable<State>, input$: Observable<Input>) => {
  state$ = state$.pipe( startWith(DEFAULT_STATE) );
  const foo$: Observable<State> = input$.pipe(
    filter((input) => isFoo(input)),
    withLatestFrom(state$),
    scan((acc, ([input, state]) => {
        //returns derived state
    });
   const bar$: Observable<State> = input$.pipe(
    filter((input) => isBar(input)),
    withLatestFrom(state$),
    scan((acc, ([input, state]) => {
        //returns derived state
    });
   return merge(
     foo$,
     bar$
   );
}

,由于您想将可观察的结果作为种子价值,switchMap将有所帮助。

switchmap docs

   const bar$: Observable<State> = state$.pipe(
    switchMap((state) => {
      return input$.pipe(
        filter((input) => isBar(input)),  
        scan((curState, input) => {
          // do some logic here
          return {...curState, prop: 'new value'}
        }, state);
      )
    })

我在https://codepen.io/askmattcairns/pen/lyjeozz?editors = 0010中为更完整的解决方案做了一个示例编码。


更多详细信息

switchMap意味着切换到此处的流。因此,此代码在说,一旦state$排放值,将其存储(作为state),然后等待input$发射。

当我们调用scan时,其种子值(scan的第二个属性)现在是state$的发射值的结果。

这将在input$收到新值时发出新值。

更新到代码沙盒

挖掘到您的代码沙箱后,我最好了解问题所在。您可以在https://codesandbox.io/s/elegant-chaum-2b794?file=/src/index.tsx.

上查看我的最终输出。

当您初始化2个内部流foo$bar$时,它们都使用withLatestFrom参考state$。每次 input$发射时,它仍然使用0作为其开始总数引用state的原始值。

最新更新