rxjs zip并不懒惰



我已经移除了样板以达到

// a.js

// My observables from stream and event
this.a = Rx.Node.fromStream(this.aStream());
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem');
// Zip 'em
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) {
    return {item: s2, a: s1.toString()};
});
// Streams the lowercase alphabet
rb.prototype.aStream = function aStream() {
var rs = Readable();
var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    console.log('Hit!');
    if (c > 'z'.charCodeAt(0)) {
        rs.push(null);
    }
};
return rs;
};

// b.js(需要上面导出的模块)

rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...)  in the module to trigger the itemSource observable

我期望看到的:

控制台中打印的{item: 'a', a: 'a'}

发生了什么:

CCD_ 4在CCD_ 5之前被打印24次。这意味着zipaStream中获取所有值,对它们进行缓冲,然后执行它应该执行的操作

如何获得zip提供的相同功能,但又很懒散?我的目标是使用无限流/可观察流,并将其与有限流(异步)压缩在一起。

编辑

通过可运行的查看/编辑:RX Zip测试编辑2基于答案更新的代码->现在没有输出。

zip确实很懒。它只订阅ab,并在其中任何一个产生新值时进行工作。

您的问题是,一旦zip订阅了fromStream,它就会同步发送所有值。之所以会发生这种情况,是因为您的自定义Readable一直在说"有更多数据可用!"

让你的Readable异步,你就会得到想要的行为。

试试这种(未经测试的)

var rs = Readable();
var subscription = null;
rs._read = function () {
    if (!subscription) {
        // produce the values once per second
        subscription = Rx.Observable
            .generateWithRelativeTime(
                97, // start value
                function (c) { return c > 'z'.charCodeAt(0); }, // end condition
                function (c) { return c + 1; }, // step function
                function (c) { return String.fromCharCode(c); }, // result selector
                function () { return 1000; }) // 1000ms between values
            .subscribe(
                function (s) {
                    rs.push(s);
                    console.log("Hit!");
                },
                function (error) { rs.push(null); },
                function () { rs.push(null); });
    }
};

相关内容

  • 没有找到相关文章

最新更新