假设我们有一个 Subject,它可以返回一组给定的值,所有这些值都可能需要不同的方法来处理它们。我们可以辩论是否应该如此,但不是我调查的重点。
所以。。。为了解决这个问题,我们可能会让我们的代码看起来像这样:
var sub = new Subject();
sub.subscribe(
data => {
if (caseA) {
// do something
}
if (caseB) {
// do something else
}
});
这一切都是膨胀和伟大和所有...但我想知道是否有一些运算符可以链接以使其更像 rx-?想到过滤器,但链接案例 A 和案例 B 只会使其不起作用(显然(,因为它最终会过滤掉两者。
所以,我的问题归结为:是否有可能有任何类似于波纹管伪代码的东西?您知道任何操作员会像这样工作吗?
var sub = new Subject();
sub.pipe(
magicOperator(//handles caseA),
magicOperator(//handles caseB),
)
subscribe(
data => {
// handles common thread
});
我认为解决此问题的更多 rxjs 方法类似于@ritaj建议的方法,但对分区流使用静态合并运算符(如果您想按顺序处理它们,则进行 concat(
let sub = new Subject();
// i'm using typescript syntax, but you can refer to them by index
let [leftPartition, rightPartition] = sub.pipe( partition( [predicate] ) )
// apply custom logic to the left and right partition
rightPartition = rightPartition.pipe( map( x => x + 1 ) )
leftPartition = leftPartition.pipe( map( x => x - 1 ) )
merge(
leftPartition,
rightPartition
).pipe(
tap(
( value ) => {
// this stream consumes both right and left partition outputs
}
)
).subscribe();
分区运算符
合并(静态(
静态合并版本是从 rxjs 包导入的,并接受任意数量的参数(您甚至可以使用 spread 传递数组参数(
应用地图运算符后,我将可观察量重新分配给同一变量。我不知道这是否是一种好的做法,但如果您需要完整的源代码,可以改进或分离到不同的变量(
附言。我通常尝试使订阅回调不受任何实现的影响,因为您在流结束时所做的大多数事情都是副作用,我将它们放入 tap 运算符中(与订阅相同的参数(
只需创建一个带有大小写切换函数的对象即可使代码更漂亮
const handler={
case1:()=>..handle case1,
case2:()=>...handle case2
}
sub.map(result=>handler[result])
// experimental switchCase operator
const switchCase=handlers=>(source)=>source.map(val=>handlers[val]())
// usage
sub.pipe(switchCase({
case1:....,
case2:....
}))
sub.pipe(
magicOperator(//handles caseA),
magicOperator(//handles caseB),
)
您可以通过使用自定义管道操作员(又名 OperatorFunctions(进行一些体操来获得非常接近这种风格的东西。它们可以实现这样的事情,允许您轻松地横向分散可观察量:
sub.pipe(
doAllTheMagic([magicOperator(/*handles caseA*/),magicOperator(/*handles caseB*/)])
)
例如,如果我有一个这样的函数:
function hiFromDad(): OperatorFunction<string, string> {
return (source: Observable<string>) =>
source.pipe(map((name) => `Hi ${name}, I'm Dad`));
}
这可以使用Observable.pipe
直接管道:
of('World')
.pipe(hiFromDad())
.subscribe(console.log);
如果我想和好友一起使用:
function hiFromBob(): OperatorFunction<string, string> {
return (source: Observable<string>) =>
source.pipe(map((name) => `Hi ${name}, I'm Bob`));
}
我可以将相同的流发送给两者,让它们根据需要发出:
function hiFromAll(
hiFromOps: Array<OperatorFunction<string, string>>
): OperatorFunction<string, string> {
return (source: Observable<string>) =>
merge(...hiFromOps.map((op) => source.pipe(op)));
}
...
of('World')
.pipe(hiFromAll([hiFromBob(), hiFromDad()]))
.subscribe(console.log);
关于OperatorFunction
的一点点: 在 Typescript 中,您可以描述其输入和输出。在上面的例子中,OperatorFunction
都接收一个string
,并发出一个string
。如果您像我一样在理解文档时遇到困难,则返回OperatorFunction
函数的一般描述将是:
function makeAPipeable([args]): OperatorFunction<T,U> {
return (source: T) => [output a U object]
}
看到这个堆栈闪电战。