取最后一个推送到Observable(序列)的项目



我在类中有一个IObservable<Item>,我想公开一个只读属性,该属性提供在给定时间推送到可观察对象的最后一个项。因此,它将提供Item的单个值。

如果没有推送任何值,则必须返回默认值。

我如何做到这一点,而不必订阅可观察到的内容并拥有"支持字段"?

只是在这里补充@Asti的答案,也许可以帮助你克服挫折:

可观测不是一个物理的"东西",它更多的是一个逻辑概念。Rx经常被比作LINQ,在很多时候这是一个公平的比较。不过,当你开始谈论数据结构时,它会崩溃:LINQ的可枚举项与Lists非常相似,可以用于学习目的。

然而,在Rx方面,根本没有与List等效的好东西。可观察的是一个瞬态数据结构,所有操作符都处理这个瞬态。如果你正在寻找一个永久状态,你就要离开Rx。

话虽如此,将可观察状态转换为某种状态是一个常见的问题,有一些包可能会对您有所帮助:ReactiveUI可能是最为人所知的。ReactiveProperty是另一种。这两个软件包都有缺陷,但可能会对您有所帮助。

如果你只是在寻找一种更简单的方法来获得背景场,而不需要锅炉电镀出背景场,这将起作用:

public static class ReactivePropertyExtensions
{
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
{
return new ReactiveProperty<T>(source);
}
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue)
{
return new ReactiveProperty<T>(source, defaultValue);
}
}
public class ReactiveProperty<T> : IDisposable
{
private IObservable<T> Source { get; }
private IDisposable Subscription { get; }
public T Value { get; private set; }
public ReactiveProperty(IObservable<T> source)
: this(source, default(T)) { }
public ReactiveProperty(IObservable<T> source, T defaultValue)
{
Value = defaultValue;
Source = source;
Subscription = source.Subscribe(t => Value = t);
}
public void Dispose()
{
Subscription.Dispose();
}
}

示例用法:

var ticker = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish().RefCount();
var latestTickerValue = ticker.ToReactiveProperty();
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine(latestTickerValue.Value);

假设一个热的可观测到的。

对于observable = source.Replay(1); observable.Connect();

提供价值:

public int Value => observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();

如果没有推送任何值,这将返回一个默认值。

你想要从被动状态过渡到状态,所以后备场不是一个糟糕的选择。你提到你不想订阅,但要观察任何东西:什么,某处必须订阅

根据Asti解决方案的精神,这里有另一种定义Value属性的方法。

private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;
public SomeClass() // Constructor
{
_source = /* Initialize the source observable (hot) */
_lastValue = _source
.Catch(Observable.Never<Item>())
.Concat(Observable.Never<Item>())
.Publish(default)
.AutoConnect(0)
.FirstAsync();
}
public Item Value => _lastValue.Wait();

接受initialValue参数的Publish运算符。。。

返回一个可连接的可观察序列,该序列共享对基础序列的单个订阅,并以initialValue开头。该运算符是使用BehaviorSubject<T>Multicast的专门化。

BehaviorSubject<T>是一个专门的ISubject<T>,它。。。

表示一个随时间变化的值。观察者可以订阅主题以接收最后(或初始)值和所有后续通知。

添加了CatchConcat运算符,以便保留最后一个值,即使在源序列正常或异常完成的情况下也是如此。

就我个人而言,我会犹豫是否使用这个解决方案,因为在Do操作符中更新的volatile字段会更自然地完成同样的事情。我发布它主要是为了展示Rx的功能。

相关内容

  • 没有找到相关文章

最新更新