角度:分叉加入 ngrx 可观察量



我有一个包含多个状态的ngrx存储。我正在尝试通过从每个状态收集数据来构造一个对象(Rent 对象(:这是我的代码:

ngOnInit() {

  //Fetch the rent and its properties data
  const rentID = +this.route.snapshot.params['id'];
  let rentState$ = this.store.select('rentState');
  let clientState$ = this.store.select('clientState');
  let vehiculeState$ = this.store.select('vehiculesState');
  let paymentState$ = of(null);//TODO
  let importRent$ = forkJoin([rentState$, vehiculeState$, clientState$, paymentState$])
    .pipe(mergeMap(([rentData, vehiculeData, clientData, paymentData]: [any, any, any, any]) => {
      let rent = new Rent();
      rent = rentData.rentList.filter(rent => rent.id === rentID)[0];
      rent.vehicule = vehiculeData.vehicules.filter(vehicule => vehicule.id === this.rent.vehicule.id)[0];
      rent.client = clientData.clientList.filter(client => client.id === this.rent.client.id)[0];
      rent.companion = clientData.companionList.filter(companion => companion.id === this.rent.companion.id)[0];
      //TODO: retrieve payments
      return of(rent);
    }))

  importRent$.subscribe(data => console.log('hello'));
}

但是我在控制台中没有收到任何消息"hello"。由于某种原因,订阅代码不会发生。

我的代码中已经有这些导入:

import { Observable } from 'rxjs/Observable';
import { forkJoin } from 'rxjs/observable/forkJoin';
import { of } from 'rxjs/observable/of';
import { mergeMap } from 'rxjs/operators';

存储中已有数据。知道我做错了什么吗?

你肯定所有状态都发出并完成吗?试试combineLatest?而且我认为您不需要将可观察量放在数组中作为forkJoin的参数.

当使用combineLatest时,每次一个child-source Observable在所有child-source的初始发射之后发出时,整个东西都会发出。如果您不想要这种情况over-emitting您可能还需要研究distinctUntilChanged

1 emits, 2 emits => combineLatest emits => 1 emits => combineLatest emits 等等...

正如@martin在他的评论中所说

forkJoin需要完成所有源可观察量。

所以你只需要取take(1)first()发出的第一个值:

let importRent$ = combineLatest(
  rentState$.pipe(take(1)), 
  vehiculeState$.pipe(take(1)), 
  clientState$.pipe(take(1)), 
  paymentState$.pipe(take(1))
)

最新更新