RXJS:我应该如何通过socket.js事件更新可观察的元素



试图重新实现公司从承诺和命令样式转换为RXJ和功能样式的主要应用。并发现自己处于特定情况。

问题:我们有一个可以过滤的系统中帖子列表。因此,我创建了一个非常简单的过滤器组件和一个列表组件,其中我收到这样的数据:

this.items = this.filterChange
    .startWith(this.filterComponent.initialValue);
    .debounceTime(400)
    .switchMap((data) => this.process(data))
    .share();

,但是然后我需要从插座上插入另一个事件,以更新列表中的各个元素。我们的process功能可以理解事件并在它们上做出反应,但问题是:

merge(this.filterChange.debounceTime(400), this.socketUpdates)
    .startWith(this.filterComponent.initialValue);
    .switchMap((data) => this.process(data))
    .share();

如果更改搜索后发生更新,我们将失去搜索更改结果,在这种情况下我该怎么办?

ps:我已将其视为选项combineLatest pariwise具有更改和反应的能力。没有更优雅的解决方案吗?

这实际上是rxjs的常见错误,当您应该使用 concatMap(或 mergeMap(时,您正在使用switchMap

switchMap仅具有非副效应的只读请求/消息才真正安全。如果您正在尝试通过网络更新记录,并且您关心其完成的顺序并处理结果,则要使用concatMap。如果您不在乎结果的顺序,则应使用mergeMap

  • switchMap-将在获得新值时可观察到的任何待处理,从而删除其响应。
  • mergeMap-将立即启动并运行所有可观察到的东西,并完成完成,而无需注意结果合并后返回的顺序。
  • concatMap-一次只能按顺序观察一个可观察的,什么也没掉。
merge(this.filterChange.debounceTime(400), this.socketUpdates)
    .startWith(this.filterComponent.initialValue);
    .concatMap((data) => this.process(data))
    .share();

如果所需的行为是为了防止插座上的任何更新,直到搜索输入被拒绝,则应使一个流取决于另一个流,而不是合并。这可能是一种适合您用例的方法:

this.filterChange.switchMap( data => {
    return Observable.of( data )
           .debounceTime( 400 )
           .concat( this.socketUpdates );
  })
  .startWith(this.filterComponent.initialValue);
  .switchMap((data) => this.process(data))
  .share();

我不知道您的debounceTime操作员的用例是什么,如果您要拒绝文本输入或其他数据,但是如果愿意,您应该能够将其移出switchMap。上面的实现可能确实是"聊天"的,并订阅了套接字通信流,这也可能对您产生后果。

this.filterChange.debounceTime( 400 ).switchMap( data => {
    return Observable.of( data ).concat( this.socketUpdates );
  })
  .startWith(this.filterComponent.initialValue);
  .switchMap((data) => this.process(data))
  .share();

最新更新