我在类中有一个IObservable<Item>
,我想公开一个只读属性,该属性提供在给定时间推送到可观察对象的最后一个项。因此,它将提供Item
的单个值。
如果没有推送任何值,则必须返回默认值。
我如何做到这一点,而不必订阅可观察到的内容并拥有"支持字段"?
只是在这里补充@Asti的答案,也许可以帮助你克服挫折:
可观测不是一个物理的"东西",它更多的是一个逻辑概念。Rx经常被比作LINQ,在很多时候这是一个公平的比较。不过,当你开始谈论数据结构时,它会崩溃:LINQ的可枚举项与Lists非常相似,可以用于学习目的。
然而,在Rx方面,根本没有与List等效的好东西。可观察的是一个瞬态数据结构,所有操作符都处理这个瞬态。如果你正在寻找一个永久状态,你就要离开Rx。
话虽如此,将可观察状态转换为某种状态是一个常见的问题,有一些包可能会对您有所帮助:ReactiveUI可能是最为人所知的。ReactiveProperty是另一种。这两个软件包都有缺陷,但可能会对您有所帮助。
如果你只是在寻找一种更简单的方法来获得背景场,而不需要锅炉电镀出背景场,这将起作用:
public static class ReactivePropertyExtensions
{
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
{
return new ReactiveProperty<T>(source);
}
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue)
{
return new ReactiveProperty<T>(source, defaultValue);
}
}
public class ReactiveProperty<T> : IDisposable
{
private IObservable<T> Source { get; }
private IDisposable Subscription { get; }
public T Value { get; private set; }
public ReactiveProperty(IObservable<T> source)
: this(source, default(T)) { }
public ReactiveProperty(IObservable<T> source, T defaultValue)
{
Value = defaultValue;
Source = source;
Subscription = source.Subscribe(t => Value = t);
}
public void Dispose()
{
Subscription.Dispose();
}
}
示例用法:
var ticker = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish().RefCount();
var latestTickerValue = ticker.ToReactiveProperty();
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine(latestTickerValue.Value);
假设一个热的可观测到的。
对于observable = source.Replay(1); observable.Connect();
提供价值:
public int Value =>
observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();
如果没有推送任何值,这将返回一个默认值。
你想要从被动状态过渡到状态,所以后备场不是一个糟糕的选择。你提到你不想订阅,但要观察任何东西:什么,某处必须订阅。
根据Asti解决方案的精神,这里有另一种定义Value
属性的方法。
private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;
public SomeClass() // Constructor
{
_source = /* Initialize the source observable (hot) */
_lastValue = _source
.Catch(Observable.Never<Item>())
.Concat(Observable.Never<Item>())
.Publish(default)
.AutoConnect(0)
.FirstAsync();
}
public Item Value => _lastValue.Wait();
接受initialValue
参数的Publish
运算符。。。
返回一个可连接的可观察序列,该序列共享对基础序列的单个订阅,并以
initialValue
开头。该运算符是使用BehaviorSubject<T>
的Multicast
的专门化。
BehaviorSubject<T>
是一个专门的ISubject<T>
,它。。。
表示一个随时间变化的值。观察者可以订阅主题以接收最后(或初始)值和所有后续通知。
添加了Catch
和Concat
运算符,以便保留最后一个值,即使在源序列正常或异常完成的情况下也是如此。
就我个人而言,我会犹豫是否使用这个解决方案,因为在Do
操作符中更新的volatile
字段会更自然地完成同样的事情。我发布它主要是为了展示Rx的功能。