缓冲和减少值,同时切换到另一个可观察值



假设我们有一些可观察到的input$,它发出Item

const input$: Observable<Item>;

对于每个发射,我需要切换到另一个可观察量(类似于switchMap)。但是,我需要确保所有这些切换的可观察量都按顺序完成并运行。很容易,我们必须concatMap实现这一目标:

input$.pipe(concatMap(item => processItem(item)))

但是,我想做的是:缓冲项目并减少它们(即,我有一个函数(a: Item, b: Item): Item),而这些切换的可观察量之一处于活动状态。更具体地说,假设type Item = {[key: string]: string}.在这种情况下,我的减速器只会{...a, ...b}

我们有很多可用的buffer*window*throttle*运算符,尽管我似乎找不到一个简单的组合来实现这种行为。

我可以很好地编写我的自定义运算符,但我感兴趣的是是否可以将其表示为一些内置运算符的(简单)组合

只是为了清楚起见:输出可观察量应该发出我们切换到的可观察量的值,而不是缓冲/减少的值。此外,虽然源的完成/错误应反映在输出中,但任何正在进行的内部订阅都应首先完成。

我正在寻找的运算符本质上应该具有类似于

bufferedConcatMap<T, R>(
project: (value: T) => Observable<R>, 
reducer: (values: T[]) => T
): OperatorFunction<T, R>;

为了完整起见,这是我正在寻找的运算符的大理石图。这假设加法作为化简器,我们只是切换到输入,但延迟四个刻度:

Input:  ---123--|
Output: ------1--(5|)

在这里,1立即切换到我们的延迟(因为没有正在进行的内部订阅),四个刻度后我们得到结果。由于在此期间,23都已发出,因此它们被缓冲在一起并减少到2 + 3 = 5,这再次在四个刻度后发出,因为我们只是在1回来后才切换到这个。

更新 12/13:由于我假设内置运算符的简单组合无法在此处完成工作,因此我实现了自己的运算符。与我最初的要求相反,其行为如下:

  • 如果源完成,则输出将首先等待活动的内部订阅完成,然后再完成外部可观察量。
  • 如果源错误,外部可观察量将立即传播错误。这更符合concatMapexhaustMap等运营商。

我还没有为此编写测试套件,但到目前为止它似乎工作正常。

我将在这里发布操作员的代码,您也可以在这里找到Stackblitz游乐场。

type Reducer<A, B> = (values: A[]) => B;
type Project<A, B> = (value: A) => ObservableInput<B>;
export function bufferReduceMap<A, B, R>(reducer: Reducer<A, B>, project: Project<B, R>): OperatorFunction<A, R> {
return function (source: Observable<A>) {
return source.lift(new BufferReduceMapOperator<A, B, R>(reducer, project));
};
}
class BufferReduceMapOperator<A, B, R> implements Operator<A, R> {
constructor(private reducer: Reducer<A, B>, private project: Project<B, R>) {}
call(subscriber: Subscriber<R>, source: any): TeardownLogic {
return source.subscribe(new BufferReduceMapSubscriber<A, B, R>(subscriber, this.reducer, this.project));
}
}
class BufferReduceMapSubscriber<A, B, R> extends OuterSubscriber<A, B> {
private buffer: A[] = [];
private active = false;
private hasCompleted = false;
private hasErrored = false;
constructor(
destination: Subscriber<R>,
private reducer: Reducer<A, B>,
private project: Project<B, R>,
) {
super(destination);
}
protected _next(value: A) {
const buffer = this.buffer;
buffer.push(value);
this._tryNext();
}
protected _complete() {
this.hasCompleted = true;
if (!this.active && this.buffer.length === 0) {
this.destination.complete();
}
this.unsubscribe();
}
public notifyComplete(innerSub: Subscription) {
this.remove(innerSub);
this.active = false;
if (this.buffer.length !== 0) {
this._tryNext();
} else if (this.hasCompleted) {
this.destination.complete();
}
}
protected _tryNext() {
if (this.active) {
return;
}
let reduced: B;
try {
reduced = this.reducer(this.buffer);
} catch (err) {
this.destination.error(err);
return;
}
let result: ObservableInput<R>;
try {
result = this.project(reduced);
} catch (err) {
this.destination.error(err);
return;
}
this.active = true;
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
this.buffer = [];
subscribeTo<R>(result)(innerSubscriber);
}
}

最新更新