RxJs 5 share() 运算符如何工作



对我来说,RxJs 5 share()运算符的工作原理不是 100% 清楚,请参阅此处的最新文档。Jsbin在这里回答问题。

如果我创建一个由 0 到 2 组成的可观察量,每个值用一秒分隔:

var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
    console.log('some side effect');
});

如果我为此可观察量创建两个订阅者:

source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));

我在控制台中得到这个:

"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"

我以为每个订阅都会订阅相同的可观察内容,但事实似乎并非如此!就像订阅的行为创建了一个完全独立的可观察对象!

但是,如果将share()运算符添加到可观察的源中:

var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
    console.log('some side effect ...');
})
.share();

然后我们得到这个:

"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"

这就是我在没有share()的情况下所期望的.

这是怎么回事,share()操作员是如何工作的? 每个订阅是否创建一个新的可观察链?

请注意,您使用的是 RxJS v5,而您的文档链接似乎是 RxJS v4。我不记得具体细节,但我认为share运营商经历了一些变化,特别是在完成和重新订阅方面,但不要相信我的话。

回到你的问题,正如你在研究中展示的那样,你的期望与图书馆设计不对应。可观察量延迟实例化其数据流,具体地在订阅者订阅时启动数据流。当第二个订阅者订阅相同的可观察量时,将启动另一个新的数据流,就好像它是第一个订阅者一样(所以是的,正如你所说,每个订阅都会创建一个新的可观察量链(。这就是 RxJS 术语中创造的冷可观察量,这是 RxJS 可观察量的默认行为。如果你想要一个在数据到达时将其数据发送给订阅者,这被称为热可观察量,获得热可观察量的一种方法是使用 share 运算符。

您可以在此处找到图解订阅和数据流:热可观察量和冷可观察量:是否有"热"和"冷"运算符?(这对 RxJS v4 有效,但其中大部分对 v5 有效(。

如果满足以下 2 个条件,则共享使可观察的"热":

  1. 订阅者数量> 0
  2. 并且可观察量尚未完成

方案 1:订阅者数> 0 且在新订阅之前未完成可观察

var shared  = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log('observer1')),
    observer2;
setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 3000);
// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
// another emission for both observers at: startTime + 10 seconds

方案 2:在新订阅之前订阅者数为零。变得"冷">

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log('observer1')),
    observer2;
setTimeout(()=>{
    observer1.unsubscribe(); 
}, 1000);
setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time
}, 3000);
// observer2's onNext is called at startTime + 8 seconds
// observer2's onNext is called at startTime + 13 seconds

方案 3:在新订阅之前完成可观察时。变得"冷">

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
        console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
    };
var observer1 = shared.subscribe(log('observer1')),
    observer2;
setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 12000);
// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs

最新更新