为什么 ReplySubject 在订阅具有相同序列的不同可观察量时具有不同的行为?

  • 本文关键字:观察 ReplySubject c# .net system.reactive
  • 更新时间 :
  • 英文 :


example1 创建可观察量由 Observable.Interval :

var observable1 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3); 
var replaySubject1 = new ReplaySubject<long>(); 
observable1.Subscribe(replaySubject1); // subscribe 
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));

输出如下:

first:0
second:0
third:0
first:1
second:1
third:1
first:2
second:2
third:2

示例 2 通过 Observable 创建可观察量。创建 :

var observable2 = Observable.Create<long>(observer =>
{
for (var i = 0; i <= 2; i++)
{
observer.OnNext(i);
Thread.Sleep(1000);
}
observer.OnCompleted();
return Disposable.Empty;
}); 
var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
observable2.Subscribe(replaySubject2); // subscribe 
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));

输出如下:

first:0
first:1
first:2
second:0
second:1
second:2
third:0
third:1
third:2

假设示例将具有相同的输出,但是,我错了,为什么?

这是因为当您将 ReplaySubject 订阅到 Observable 时,Observable.Create 中的代码会同步执行。

请尝试此异步版本:

var observable2 = Observable.Create<long>(async observer =>
{
for (var i = 0; i <= 2; i++)
{
observer.OnNext(i);
await Task.Delay(1000);
}
observer.OnCompleted();
return Disposable.Empty;
});
var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
observable2.Subscribe(replaySubject2); // subscribe 
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));

或者看看 SubscribeOn/ObserveOn。

相关内容

  • 没有找到相关文章

最新更新