example1 创建可观察量由 Observable.Interval :
var observable1 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
var replaySubject1 = new ReplaySubject<long>();
observable1.Subscribe(replaySubject1); // subscribe
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
replaySubject1.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));
输出如下:
first:0
second:0
third:0
first:1
second:1
third:1
first:2
second:2
third:2
示例 2 通过 Observable 创建可观察量。创建 :
var observable2 = Observable.Create<long>(observer =>
{
for (var i = 0; i <= 2; i++)
{
observer.OnNext(i);
Thread.Sleep(1000);
}
observer.OnCompleted();
return Disposable.Empty;
});
var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
observable2.Subscribe(replaySubject2); // subscribe
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
输出如下:
first:0
first:1
first:2
second:0
second:1
second:2
third:0
third:1
third:2
假设示例将具有相同的输出,但是,我错了,为什么?
这是因为当您将 ReplaySubject 订阅到 Observable 时,Observable.Create 中的代码会同步执行。
请尝试此异步版本:
var observable2 = Observable.Create<long>(async observer =>
{
for (var i = 0; i <= 2; i++)
{
observer.OnNext(i);
await Task.Delay(1000);
}
observer.OnCompleted();
return Disposable.Empty;
});
var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
observable2.Subscribe(replaySubject2); // subscribe
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));
或者看看 SubscribeOn/ObserveOn。