RxJS 管道过滤成 2+ 分支



假设我们有一个 Subject,它可以返回一组给定的值,所有这些值都可能需要不同的方法来处理它们。我们可以辩论是否应该如此,但不是我调查的重点。

所以。。。为了解决这个问题,我们可能会让我们的代码看起来像这样:

var sub = new Subject(); 
sub.subscribe( 
data => {
if (caseA) {
// do something
}
if (caseB) {
// do something else
}
});

这一切都是膨胀和伟大和所有...但我想知道是否有一些运算符可以链接以使其更像 rx-?想到过滤器,但链接案例 A 和案例 B 只会使其不起作用(显然(,因为它最终会过滤掉两者。

所以,我的问题归结为:是否有可能有任何类似于波纹管伪代码的东西?您知道任何操作员会像这样工作吗?

var sub = new Subject(); 
sub.pipe(
magicOperator(//handles caseA),
magicOperator(//handles caseB),
)
subscribe( 
data => {
// handles common thread
});

我认为解决此问题的更多 rxjs 方法类似于@ritaj建议的方法,但对分区流使用静态合并运算符(如果您想按顺序处理它们,则进行 concat(

let sub = new Subject();
// i'm using typescript syntax, but you can refer to them by index
let [leftPartition, rightPartition] = sub.pipe( partition( [predicate] ) )
// apply custom logic to the left and right partition
rightPartition = rightPartition.pipe( map( x => x + 1 ) )
leftPartition = leftPartition.pipe( map( x => x - 1 ) )
merge(
leftPartition,
rightPartition
).pipe(
tap( 
( value ) => {
// this stream consumes both right and left partition outputs
}
)
).subscribe();

分区运算符
合并(静态(

静态合并版本是从 rxjs 包导入的,并接受任意数量的参数(您甚至可以使用 spread 传递数组参数(

应用地图运算符后,我将可观察量重新分配给同一变量。我不知道这是否是一种好的做法,但如果您需要完整的源代码,可以改进或分离到不同的变量(

附言。我通常尝试使订阅回调不受任何实现的影响,因为您在流结束时所做的大多数事情都是副作用,我将它们放入 tap 运算符中(与订阅相同的参数(

只需创建一个带有大小写切换函数的对象即可使代码更漂亮

const handler={
case1:()=>..handle case1,
case2:()=>...handle case2
}
sub.map(result=>handler[result])
// experimental switchCase operator 
const switchCase=handlers=>(source)=>source.map(val=>handlers[val]())
// usage
sub.pipe(switchCase({
case1:....,
case2:....
}))
sub.pipe(
magicOperator(//handles caseA),
magicOperator(//handles caseB),
)

您可以通过使用自定义管道操作员(又名 OperatorFunctions(进行一些体操来获得非常接近这种风格的东西。它们可以实现这样的事情,允许您轻松地横向分散可观察量:

sub.pipe(
doAllTheMagic([magicOperator(/*handles caseA*/),magicOperator(/*handles caseB*/)])
)

例如,如果我有一个这样的函数:

function hiFromDad(): OperatorFunction<string, string> {
return (source: Observable<string>) =>
source.pipe(map((name) => `Hi ${name}, I'm Dad`));
}

这可以使用Observable.pipe直接管道:

of('World')
.pipe(hiFromDad())
.subscribe(console.log);

如果我想和好友一起使用:

function hiFromBob(): OperatorFunction<string, string> {
return (source: Observable<string>) =>
source.pipe(map((name) => `Hi ${name}, I'm Bob`));
}

我可以将相同的流发送给两者,让它们根据需要发出:

function hiFromAll(
hiFromOps: Array<OperatorFunction<string, string>>
): OperatorFunction<string, string> {
return (source: Observable<string>) =>
merge(...hiFromOps.map((op) => source.pipe(op)));
}
...
of('World')
.pipe(hiFromAll([hiFromBob(), hiFromDad()]))
.subscribe(console.log);

关于OperatorFunction的一点点: 在 Typescript 中,您可以描述其输入和输出。在上面的例子中,OperatorFunction都接收一个string,并发出一个string。如果您像我一样在理解文档时遇到困难,则返回OperatorFunction函数的一般描述将是:

function makeAPipeable([args]): OperatorFunction<T,U> {
return (source: T) => [output a U object]
}

看到这个堆栈闪电战。

最新更新