像merge一样使用RxJs运算符,但要跟踪结果中的源可观测性



我想像用";合并";操作员,但我仍然想知道哪个输入是可观测的,有办法做到这一点吗?

例如:

private result$ = merge(this.obs1$, this.obs2$).pipe(
scan((result, change) => index + change, 0),
shareReplay(1)
);

这里,来自obs1和obs2的两个值都将进入"0";改变";变量,但如果我可以访问投影仪函数,在该函数中,我可以用不同的名称标记输入可观测值,然后我可以在下面的扫描函数中做不同的事情,这取决于发射的输入可观测。其他操作符,如CombineLatest或ForkJoin,在这里似乎不适用,因为它们需要完成或从所有输入可观察器发出。

如果您需要跟踪哪个可观察的输入发出,那么您可能需要向源可观察添加元数据。在不了解如何使用result$的上下文的情况下,这是具有给定信息的最佳解决方案。

我建议为每个需要跟踪的可观察对象添加一个id属性。然后,您可以根据ID在扫描操作员中使用一些策略。

下面是一个对每个可观测源使用id的简单示例。在scan操作员中,您将看到我的策略如何根据ID进行更改。

import { interval, merge, of } from "rxjs";
import { map, scan, shareReplay } from "rxjs/operators";
const obs1$ = interval(1000).pipe(map(i => ({ i, id: "obs1" })));
const obs2$ = interval(3000).pipe(map(i => ({ i, id: "obs2" })));
let index = 0;
const result$ = merge(obs1$, obs2$).pipe(
scan((result, change) => {
if (change.id === "obs1") {
return index + change.i;
}
if (change.id === "obs2") {
return index + change.i * 2;
}
}, 0),
shareReplay(1)
);
result$.subscribe(console.log);

https://stackblitz.com/edit/rxjs-as5ket

@react-rxjs/utils有一个名为mergeWithKey的util,可以这样使用:

import { Subject } from "rxjs"
import { scan, startWith } from 'rxjs/operators'
import { mergeWithKey } from '@react-rxjs/utils'
const inc$ = new Subject()
const dec$ = new Subject()
const resetTo$ = new Subject<number>()
const counter$ = mergeWithKey({
inc$,
dec$,
resetTo$,
}).pipe(
scan((acc, current) => {
switch (current.type) {
case "inc$":
return acc + 1
case "dec$":
return acc - 1
case "resetTo$":
return current.payload
default:
return acc
}
}, 0),
startWith(0),
)

实现非常直接:


import { merge, Observable, ObservableInput, from, SchedulerLike } from "rxjs"
import { map } from "rxjs/operators"
/**
* Emits the values from all the streams of the provided object, in a result
* which provides the key of the stream of that emission.
*
* @param input object of streams
*/
export const mergeWithKey: <
O extends { [P in keyof any]: ObservableInput<any> },
OT extends {
[K in keyof O]: O[K] extends ObservableInput<infer V>
? { type: K; payload: V }
: unknown
}
>(
x: O,
concurrent?: number,
scheduler?: SchedulerLike,
) => Observable<OT[keyof O]> = (input, ...optionalArgs) =>
merge(
...(Object.entries(input)
.map(
([type, stream]) =>
from(stream).pipe(
map((payload) => ({ type, payload } as any)),
) as any,
)
.concat(optionalArgs) as any[]),
)

这是你需要的吗?

最新更新