我有一个对象通过属性公开了下面的可观察对象。
IObservable<HumidityLevel> humidity;
但是上面的可观察对象直到该对象的一个方法被调用之后才被创建。但是订阅者必须在该对象构造时订阅,该对象暴露了上面的可观察对象。
我能想到的一种方法是创建一个空的可观察对象,这样订阅者就可以订阅 IObservable<HumidityLevel> humidity = Observable.Empty<HumidityLevel>();
在对象的生命周期中,当实际的可观察对象准备好时,将其合并到上述现有的可观察对象中。
humidity = humidity.Concat(actualHumidityObservable)
现在上面的行显然修改了订阅者没有订阅的湿度引用,因此他们将永远不会收到这个对象的消息。
我如何实现我想要做的事情?在Rx中是否有任何扩展可以合并到现有的可观察对象中,以便保留订阅者?
主题是这里的关键,它们既是观察者又是可观察对象。除了通过主题的On...
方法手动调用主题上的事件,你也可以直接将它们订阅到源可观察对象——在主题自己的订阅者订阅之前或之后。
你可以这样做:
public class HumidityLevel
{
public int Value;
}
public class Monitor
{
private Subject<HumidityLevel> _humidityLevel =
new Subject<HumidityLevel>();
public IObservable<HumidityLevel> HumidityLevel
{
get
{
return _humidityLevel.AsObservable();
}
}
public void StartMonitoring(IObservable<HumidityLevel> source)
{
source.Subscribe(_humidityLevel);
}
}
注意在getter中使用了AsObservable()
——这是一种保护可观察对象的起源作为底层Subject的方法,从而避免被滥用!
然后像这样使用:
var monitor = new Monitor();
// a subscriber is free to subscribe at any time
var subscriber = monitor.HumidityLevel
.Subscribe(level => Console.WriteLine(level.Value));
// now create an example source
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i => new HumidityLevel { Value = (int)i });
// and pass it to monitor and the subscriber will start getting events
monitor.StartMonitoring(source);
注意,我在这里没有做任何清理,比如当你完成它时从源取消订阅。是否需要这样做以及如何这样做将取决于您的场景。这可能无关紧要,或者您可能希望跟踪订阅并在重新分配新源时发布它。在这种情况下,您可能对在SerialDisposable
中存储源的订阅感兴趣。这里有很多选项