Rx:一个类似 zip 的运算符,在其中一个流结束后继续



我希望组合异步开始和结束的流(可观察量):

-1----1----1----1---|->
     -2----2--|->
[ optional_zip(sum) ]
-1----3----3----1---|->

我需要它做什么:将音频流添加到一起。它们是音频"块"流,但我在这里用整数表示它们。所以有第一个剪辑播放:

-1----1----1----1---|->

然后第二个开始,过了一会儿:

     -2----2--|->

将它们按总和组合的结果应为:

-1----3----3----1---|->

但是,如果任何压缩流结束,则标准 zip 完成。我希望这个optional_zip继续下去,即使其中一个流结束了。有没有办法在 Rx 中做到这一点,还是我必须通过修改现有的 Zip 来自己实现它?

注意:我正在使用 RxPy,但这里的社区似乎很小,而且 Rx 运算符似乎在语言中非常普遍,所以我也将其标记为 rx-java 和 rx-js。

我会通过将其分为两部分来解决这个问题。首先,我想要一些需要Observable<Observable<T>>并产生一个Observable<Observable<T>[]>的东西,其中数组仅包含"活动"(即非完整)可观察量。每当将新元素添加到外部可观察量时,并且每当其中一个内部可观察量完成时,都会发出一个包含相应可观察量的新数组。这实质上是对主流的"扫描"减少。

一旦你有了可以做到这一点的东西,你就可以使用flatMapLate和zip来获得你想要的东西。

我在第一部分的基本尝试如下:

function active(ss$) {
    const activeStreams = new Rx.Subject();
    const elements = [];
    const subscriptions = [];
    ss$.subscribe(s => {
        var include = true;
        const subscription = s.subscribe(x => {}, x => {}, x => {
            include = false;
            const i = elements.indexOf(s);
            if (i > -1) {
                elements.splice(i, 1);
                activeStreams.onNext(elements.slice());
            }
        });
        if (include) {
            elements.push(s);
            subscriptions.push(subscription);
            activeStreams.onNext(elements.slice());
        }   
    });
    return Rx.Observable.using(        
        () => new Rx.Disposable(() => subscriptions.forEach(x => x.dispose())),
        () => activeStreams
    );
}

从那里,您只需将其压缩并像这样将其压平:

const zipped = active(c$).flatMapLatest(x =>
    x.length === 0 ? Rx.Observable.never()
  : x.length === 1 ? x[0]
  : Rx.Observable.zip(x, (...args) => args.reduce((a, c) => a + c))
);

我假设零活动流应该不产生任何内容,一个活动流应该产生自己的元素,两个或多个流应该全部压缩在一起(所有这些都反映在映射应用程序中)。

我的(诚然相当有限)测试具有这种组合,可以产生您所追求的结果。

顺便说一下,好问题。我还没有看到任何可以解决问题的第一部分的东西(尽管我绝不是 Rx 专家;如果有人知道已经这样做的东西,请发布详细信息)。

所以我得到了一些代码,我认为这些代码可以满足您的大部分需求。基本上,我创建了一个函数zipAndContinue,它将像zip一样运行,只是只要某些底层流仍有数据要发出,它就会继续发出项目。这个函数只用冷可观测量[简短地]测试过。

此外,欢迎更正/增强/编辑。

function zipAndContinue() {
    // Augment each observable so it ends with null
    const observables = Array.prototype.slice.call(arguments, 0).map(x => endWithNull(x));
    const combined$ = Rx.Observable.combineLatest(observables);
    // The first item from the combined stream is our first 'zipped' item
    const first$ = combined$.first();
    // We calculate subsequent 'zipped' item by only grabbing
    // the items from the buffer that have all of the required updated
    // items (remember, combineLatest emits each time any of the streams
    // updates).
    const subsequent$ = combined$
        .skip(1)
        .bufferWithCount(arguments.length)
        .flatMap(zipped)
        .filter(xs => !xs.every(x => x === null));
    // We return the concatenation of these two streams
    return first$.concat(subsequent$)
}

以下是使用的实用程序函数:

function endWithNull(observable) {
    return Rx.Observable.create(observer => {
        return observable.subscribe({
            onNext: x => observer.onNext(x),
            onError: x => observer.onError(x),
            onCompleted: () => {
                observer.onNext(null);
                observer.onCompleted();
            }
        })
    })
}
function zipped(xs) {
    const nonNullCounts = xs.map(xs => xs.filter(x => x !== null).length);
    // The number of streams that are still emitting
    const stillEmitting = Math.max.apply(null, nonNullCounts);
    if (stillEmitting === 0) {
        return Rx.Observable.empty();
    }
    // Skip any intermittent results
    return Rx.Observable.from(xs).skip(stillEmitting - 1);
}

下面是示例用法:

const one$ = Rx.Observable.from([1, 2, 3, 4, 5, 6]);
const two$ = Rx.Observable.from(['one']);
const three$ = Rx.Observable.from(['a', 'b']);
zipAndContinue(one$, two$, three$)
    .subscribe(x => console.log(x));
// >> [ 1, 'one', 'a' ]
// >> [ 2, null, 'b' ]
// >> [ 3, null, null ]
// >> [ 4, null, null ]
// >> [ 5, null, null ]
// >> [ 6, null, null ]

下面是一个 js-fiddle(您可以单击"运行",然后打开控制台): https://jsfiddle.net/ptx4g6wd/

相关内容

  • 没有找到相关文章

最新更新