我如何暂停一个RxJS缓冲的可观察对象基于在缓冲区中的值,因为他们被评估



我有一个包装套接字的可观察对象。来自服务器的事件流(称为source)。来自套接字的每条消息。IO在可观察对象上发出。该可观察对象会根据套接字的内容进行过滤并映射到多个订阅。io消息。

例如:

var filtered = source.filter(function(msg) { return msg.name === 'EventName' });
filtered.subscribe(function handeEventName(msg) { /* do something with msg */ });

有时这些订阅会在我的应用中触发长动画。当这种情况发生时,我想暂停源可观察对象,缓冲新事件,直到动画播放完毕,然后恢复可观察对象。

我已经得到了所有这些工作预期使用pausableBuffered:

var pausable = source.pausableBuffered();
var filtered = pausable.filter(function(msg) { return msg.name === 'EventName' });
filtered.subscribe(function handeEventName(msg) {
    pausable.pause();
    /**
     * do something async, like animation, then when done call
     * pausable.resume();
     */
});

一切顺利。

然而,让我们假设当可观察对象暂停时,五条消息被缓冲。第三条消息需要再次暂停流。为此,它设置了一个订阅。然而,一旦源可观察对象被取消暂停,它立即清空所有五个事件的缓冲区,所有这些事件都被处理并传递给所有五个订阅,此时第三个消息的订阅最终暂停原始流。

我明白为什么会这样,但我真正想要的是:

  1. 源暂停
  2. 五个事件被缓冲,第三个事件应该在它的订阅被处理时暂停源。
  3. 源恢复。
  4. 事件#1和#2由它们的订阅处理,
  5. 事件#3的订阅暂停源。
  6. 可能有更多的事件可以缓冲在#4和#5后面,它们仍然在缓冲区中等待。
  7. 事件#3的订阅在短暂的时间后恢复源
  8. 事件#4和#5以及任何其他事件开始传播,直到另一个应该暂停的事件由源发出。

似乎我使用pausableBuffered的每种方式最终都会将整个缓冲区转储给它们的所有订阅。我怎样才能得到我想要的?

您可以尝试controlled可观察对象。给你完全的控制。例如:

var source = Rx.Observable.interval(300).take(10);
var controlled = source.controlled();
var sourceSub = source.subscribe(
    function (x) {
        console.log('Next source: ' + x.toString());
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
var controlledSub = controlled.subscribe(
    function (x) {
        console.log('Next controlled: ' + x.toString());
        if (x === 3) {
            setTimeout(function(){
                controlled.request(1)
            }, 2000)
        } else {
            controlled.request(1);
        }
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
controlled.request(1);

恰好

最新更新