在每次从可观测源发射后合并可观测量的最新值



我有一个源可观察量(实际上是一个主题),当该可观察量发出一个值时,我需要启动一个值加载,导致另外 2 个可观察量被更新,当加载完成后,获取源的值并将其与 2 个"依赖"可观察量的最新值相结合, 但只有来自源可观测发射之后的值。 因此,如果 2 个依赖可观察量在源发出时具有值,我需要等到 2 个依赖对象获得更新后再发出所有 3 个可观察量中的最新值。 我尝试使用withLatestFrom,switchMap和combineLatest的各种咒语,但没有什么能给我想要的输出:

这将以正确的形状发出值,但不是在正确的时间,它使用的是 SelectExpenseSummary 完成其操作之前的值,并且 expenseDetails$ 和 expenseReceipts$ 已更新。

this.editExpenseSubject.pipe(
takeWhile(() => this.active),
tap(x => this.userStore.dispatch(new SelectExpenseSummary(x))),
tap(x => this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:tap', x})),
withLatestFrom(
this.expenseDetails$,
this.expenseReceipts$,
)
)
.subscribe(x => {
this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:subscribe', x});
});

这会发出很多次,最后一次具有正确的值,这是可以的,但发出错误的形状,它缺少从输出中可观察到的源:

this.editExpenseSubject.pipe(
takeWhile(() => this.active),
tap(x => this.userStore.dispatch(new SelectExpenseSummary(x))),
tap(x => this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:tap', x})),
switchMap(() =>
combineLatest(this.expenseDetails$,
this.expenseReceipts$,
)
)
)
.subscribe(x => {
this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:subscribe', x});
});

它具有正确的输出形状,具有所有 3 个可观察量,但它永远不会在订阅中发出任何内容,我猜是因为在里面使用了 editExpenseSubject:

this.editExpenseSubject.pipe(
takeWhile(() => this.active),
tap(x => this.userStore.dispatch(new SelectExpenseSummary(x))),
tap(x => this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:tap', x})),
switchMap(x =>
combineLatest(
this.editExpenseSubject
, this.expenseDetails$
, this.expenseReceipts$
)
)
)
.subscribe(x => {
this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:subscribe', x});
});

有人可以指出我正确的咒语来获得我想要的东西,最好是详细说明原因。


更新以获取更多信息(我认为这可以让您完成整个流程):

this.expenseDetails$ = this.expenseStore.pipe(
select(fromExpenses.getExpenseLines)
);
this.expenseReceipts$ = this.expenseStore.pipe(
select(fromExpenses.getExpenseReceipts)
);
export const getExpenseLines = createSelector(StateFeatureSelector, x => x.expenseLines);
export const getExpenseReceipts = createSelector(StateFeatureSelector, x => x.expenseReceipts);
export interface State {
expenseSummaries: Array<IExpenseSummary>;
expenseLines: Array<IExpenseLine>;
expenseReceipts: Array<IExpenseReceipt>;
selectedUser: IDirectReport;
selectedExpenseSummary: IExpenseSummary;
}

@Effect() LoadExpenseLines$ = this.actions$.pipe(
ofType<fromExpense.LoadExpenseLines>(fromExpense.ActionTypes.LoadExpenseLines)
, tap(action => this.log.debug({type: 'ExpenseEffects:LoadExpenseLines$:filtered', action}))
, mergeMap(x =>
this.service.getExpenseLines(x && x.payload)
.pipe(
tap(receipts => this.log.debug({type: 'ExpenseEffects:getExpenseLines', receipts}))
, map(lineItems => new fromExpense.SetExpenseLines(lineItems))
)
)
);
@Effect() LoadExpenseReceipts$ = this.actions$.pipe(
ofType<fromExpense.LoadExpenseReceipts>(fromExpense.ActionTypes.LoadExpenseReceipts)
, tap(action => this.log.debug({type: 'ExpenseEffects:LoadExpenseReceipts$:filtered', action}))
, mergeMap(x =>
this.service.getExpenseReceipts(x && x.payload)
.pipe(
tap(receipts => this.log.debug({type: 'ExpenseEffects:getExpenseReceipts', receipts}))
, map(receipts => new fromExpense.SetExpenseReceipts(receipts))
)
)
);
@Effect() SelectExpenseSummary$ = this.actions$.pipe(
ofType<fromExpense.SelectExpenseSummary>(fromExpense.ActionTypes.SelectExpenseSummary)
, tap(action => this.log.debug({type: 'ExpenseEffects:SelectExpenseSummary$:filtered', action}))
, mergeMap(x =>
[
new fromExpense.LoadExpenseLines(x.payload)
, new fromExpense.LoadExpenseReceipts(x.payload)
]
)
);
export class SelectExpenseSummary implements Action {
readonly type = ActionTypes.SelectExpenseSummary;
constructor(public payload: IExpenseSummary) {}
}
export class LoadExpenseLines implements Action {
readonly type = ActionTypes.LoadExpenseLines;
constructor(public payload: IExpenseSummary) {}
}
export class SetExpenseLines implements Action {
readonly type = ActionTypes.SetExpenseLines;
constructor(public payload: Array<IExpenseLine>) {}
}
export class LoadExpenseReceipts implements Action {
readonly type = ActionTypes.LoadExpenseReceipts;
constructor(public payload: IExpenseSummary) {}
}
export class SetExpenseReceipts implements Action {
readonly type = ActionTypes.SetExpenseReceipts;
constructor(public payload: Array<IExpenseReceipt>) {}
}
export function reducer (state = initialState, action: actions.ActionsUnion): State {
switch (action.type) {
// ...  other actions cut
case actions.ActionTypes.SetExpenseLines:
return {
...state,
expenseLines: action.payload && [...action.payload] || []
};
case actions.ActionTypes.SetExpenseReceipts:
return {
...state,
expenseReceipts: action.payload && [...action.payload] || []
};
default:
return state;
}
}
// from the service class used in the effects
getExpenseLines(summary: IExpenseSummary): Observable<Array<IExpenseLine>> {
this.log.debug({type: 'ExpenseService:getExpenseLines', summary, uri: this.detailUri});
if (summary) {
return this.http
.post<Array<IExpenseLine>>(this.detailUri, {ExpenseGroupId: summary.ExpReportNbr})
.pipe(
tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseLines:reponse', data }))
);
} else {
this.log.log({ type: 'ExpenseService:getExpenseLines null summary'});
return of<Array<IExpenseLine>>([])
.pipe(
tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseLines:reponse', data }))
);
}
}
getExpenseReceipts(summary: IExpenseSummary): Observable<Array<IExpenseReceipt>> {
this.log.debug({type: 'ExpenseService:getExpenseReceipts', summary, uri: this.receiptUri});
if (summary) {
return this.http
.post<Array<IExpenseReceipt>>(this.receiptUri, {ExpenseGroupId: summary.ExpReportNbr})
.pipe(
tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseReceipts:reponse', data }))
);
} else {
this.log.log({ type: 'ExpenseService:getExpenseReceipts null summary'});
return of<Array<IExpenseReceipt>>([])
.pipe(
tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseReceipts:reponse', data }))
);
}
}

我从@Reactgular的假设出发:

您希望从主体中获取一个发出的值,然后从其他 2 个可观察量发出它们的最新值。输出是一个包含 3 个项目的数组。

您希望 2 个内部可观察量在外部可观察量发射之后获取新数据,并且您只需要它们的第一个值。

从那里开始,想到的运算符是skip:它会跳过您要求跳过的值的数量,然后继续。

我制作了一个沙箱供您查看它的实际效果:如您所见,两个可观察量都是从单个源更新的,并且您只会得到一个事件,即每个可观察量的最后一个值。

相关代码 :

import { BehaviorSubject, combineLatest, fromEvent } from "rxjs";
import {
delayWhen,
skip,
distinctUntilKeyChanged,
distinctUntilChanged,
skipUntil,
switchMap,
map
} from "rxjs/operators";
const counts = {
source: 0,
first: 0,
second: 0
};
const source$ = new BehaviorSubject("source = " + counts.source);
const first$ = new BehaviorSubject("first = " + counts.first);
const second$ = new BehaviorSubject("second = " + counts.second);
// Allow the source to emit
fromEvent(document.querySelector("button"), "click").subscribe(() => {
counts.source++;
source$.next("source = " + counts.source);
});
// When the source emits, the others should emit new values
source$.subscribe(source => {
counts.first++;
counts.second++;
first$.next("first = " + counts.first);
second$.next("second = " + counts.second);
});
// Final result should be an array of all observables
const final$ = source$.pipe(
switchMap(source =>
first$.pipe(
skip(1),
switchMap(first =>
second$.pipe(
skip(1),
map(second => [source, first, second])
)
)
)
)
);
final$.subscribe(value => console.log(value));

如果我理解正确的话....

您希望从主体中获取一个发出的值,然后从其他 2 个可观察量发出它们的最新值。输出是一个包含 3 个项目的数组。

您希望 2 个内部可观察量在外部可观察量发射之后获取新数据,并且您只需要它们的第一个值。

this.editExpenseSubject.pipe(
switchMap(editExpense =>
forkJoin(
this.expenseDetails$.pipe(first()),
this.expenseReceipts$.pipe(first()),
).pipe(
map(values => ([editExpense, ...values]))
)
)
)

您可以使用 switchMap(),以便它在内部可观察量上调用订阅,并使用 forkJoin() 获取最终值。添加 first() 运算符以将可观察量限制为 1 个值。然后使用 map() 将外部值放回结果数组中。

最新更新