Rx:如何在满足条件之前缓冲项目



我有一个项目流,我想缓冲它们,直到其中一个项目满足条件。满足条件后,应将缓冲项传递给订阅服务器。

如果源可观察量完成但尚未满足条件,我希望产生错误。

这可以使用 RxJava 或 Rx 系列的其他语言中的默认运算符来实现吗?

我使用 RxJS 编写了下面的示例,以便我们可以在这里实时运行示例。这些概念与RxJava相同。

BufferedSubject

class BufferedSubject extends Rx.Subject {
    constructor(predicate) {
        super();
        this.predicate = predicate;
        this.buffer = [];
        this.unbuffered = false;
    }
    onNext(e) {
        if (this.unbuffered) {
            super.onNext(e);
            return;
        }
        if (this.predicate(e)) {
            while (this.buffer.length) {
                super.onNext(this.buffer.shift());
            }
            super.onNext(e);
            this.unbuffered = true;
            return;
        }
        this.buffer.push(e);
    }
}

您可以像使用常规主体一样使用BufferedSubject作为某些源可观察量和所需的缓冲可观察量之间的中介 - 您订阅主题,主题订阅源可观察量,主体维护项目的缓冲区,直到某些条件成立。通过示例:

const timer = Rx.Observable.interval(5000);
const subject = new BufferedSubject((item) => item > 5);
subject.subscribe(
    x => $('.list').append(`<li>${x} seconds</li>`)
);
timer.subscribe(subject);

作为一个可运行的片段(从 ES6 编译,抱歉):

'use strict';
function _classCallCheck(instance, Constructor) {
    if (!(instance instanceof Constructor)) {
        throw new TypeError('Cannot call a class as a function');
    }
}
function _possibleConstructorReturn(self, call) {
    if (!self) {
        throw new ReferenceError('this hasn't been initialised - super() hasn't been called');
    }
    return call && (typeof call === 'object' || typeof call === 'function') ? call : self;
}
function _inherits(subClass, superClass) {
    if (typeof superClass !== 'function' && superClass !== null) {
        throw new TypeError('Super expression must either be null or a function, not ' + typeof superClass);
    }
    subClass.prototype = Object.create(superClass && superClass.prototype, {
        constructor: {
            value: subClass,
            enumerable: false,
            writable: true,
            configurable: true
        }
    });
    if (superClass)
        Object.setPrototypeOf ? Object.setPrototypeOf(subClass, superClass) : subClass.__proto__ = superClass;
}
var BufferedSubject = function (_Rx$Subject) {
    _inherits(BufferedSubject, _Rx$Subject);
    function BufferedSubject(predicate) {
        _classCallCheck(this, BufferedSubject);
        var _this = _possibleConstructorReturn(this, _Rx$Subject.call(this));
        _this.predicate = predicate;
        _this.buffer = [];
        _this.unbuffered = false;
        return _this;
    }
    BufferedSubject.prototype.onNext = function onNext(e) {
        if (this.unbuffered) {
            _Rx$Subject.prototype.onNext.call(this, e);
            return;
        }
        if (this.predicate(e)) {
            while (this.buffer.length) {
                _Rx$Subject.prototype.onNext.call(this, this.buffer.shift());
            }
            _Rx$Subject.prototype.onNext.call(this, e);
            this.unbuffered = true;
            return;
        }
        this.buffer.push(e);
    };
    return BufferedSubject;
}(Rx.Subject);
var timer = Rx.Observable.interval(1000).skip(1).take(50).publish();
var subject = new BufferedSubject(function (item) {
    return item > 5;
});
subject.subscribe(function (x) {
    return $('.list').append('<li>' + x + ' seconds</li>');
}, function (e) {
    return $('.list').append('<li>Errror ' + e + '</li>');
}, function (_) {
    return $('.list').append('<li>Sequence complete</li>');
});
timer.subscribe(subject);
timer.subscribe(function (x) {
    return $('.timer').text(x + ' seconds elapsed');
});
timer.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.min.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>
<p>Output (<span class='timer'>-</span>):</p>
<ul class="list">
</ul>

(也可在 CodePen 上使用。

注意:我会说此实现不能正确支持背压。

最新更新