我有一个可观察的集合,为所谓的Channel
生成状态变化。我有一个ChannelSet
应该监视这些通道。
我想这样写:如果一个通道是可操作的,通道设置是启动的,否则,通道设置是关闭的。
IEnumerable<ChannelState> channelStates = ...;
if (channelStates.Any(cs => cs == ChannelState.Operational))
channelSet.ChannelSetState = ChannelSetState.Up;
else
channelSet.ChannelSetState = ChannelSetState.Down;
但是我从哪里得到我的IEnumerable<ChannelState>
?如果我有一个通道,我可以简单地订阅它的状态更改,并相应地修改通道集的状态。对于两个通道,我可以使用CombineLatest
:
Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) =>
{
if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up)
return ChannelSetState.Up;
else
return ChannelSetState.Down;
});
但我有一个IEnumerable<Channel>
和相应的IEnumerable<IObservable<ChannelState>>
。我正在寻找类似CombineLatest
的东西,它不限于固定数量的可观测值。
使问题复杂化的是,通道集合可以添加到。例如,偶尔会添加一个通道。新通道还生成需要合并的状态更改。
所以我实际上要找的是一个函数:
IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState>
在输入更改时保持最新。应该有一些方法可以使用Rx来完成这一点,但我真的不知道如何。
有一种相当直接的方法可以用Rx做你想做的事情,但是你需要只考虑可观察对象,而不是混用枚举对象。
你真正需要考虑的函数签名是:
IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>
函数如下:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();
重要的是,IObservable<IObservable<ChannelState>>
中的每个IObservable<ChannelState>
都能正常工作。
我假设ChannelState
枚举具有Idle
状态,并且每个IObservable<ChannelState>
在完成之前将产生零个或多个Operational
/Idle
值对(Operational
后跟Idle
)。
你还说"可以添加和删除通道的集合"-考虑到IEnumerable<IObservable<ChannelState>>
这听起来很合理-但在Rx中你不必担心删除,因为每个可观察对象都可以发出自己的完成信号。一旦它发出完成信号,就好像它已经从集合中删除了,因为它不能产生任何进一步的值。因此,您只需要担心添加到集合中—使用主题很容易。
现在这个函数可以这样使用:
var channelStatesSubject = new Subject<IObservable<ChannelState>>();
var channelStates = channelStatesSubject.AsObservable();
var channelSetStates = f(channelStates);
channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ });
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
// etc
我使用一些测试代码运行此代码,该代码使用三个随机的ChannelState
可观察对象,在f
函数中调用Do
进行调试,并得到以下序列:
1
Up
2
3
2
1
2
1
0
Down
1
Up
0
Down
我想这就是你想要的。如果我错过了什么,请告诉我。
根据下面的评论,ChannelState
枚举有多个状态,但只有Operational
意味着连接是正常的。所以很容易添加一个DistinctUntilChanged
运算符来隐藏多个"down"状态。下面是代码:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.DistinctUntilChanged()
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();
添加代码以确保第一个选择查询总是以1
开头。下面是代码:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.StartWith(1)
.DistinctUntilChanged()
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();
也许可以从IObservable<Channel>
开始,而不是从w/IEnumerable<Channel>
开始。这样做的一种方法是使用Subject<Channel>
,当一个新的创建时,OnNext()
它。
如果你需要一个列表,
xsChannels。订阅(项=>{锁(列表){list.add(项);}});
我答应添加我自己想出的解决方案,所以在这里。只要我还没有找到更好的,我会用这个,虽然我仍然认为必须有一个更好的方法:)
我使用一个使用ConcurrentDictionary
的类来保存每个注册的可观察对象的最新值。当一个可观察对象被取消注册时,它的最新值会被移除,与它关联的订阅也会被移除。
当任何已注册的可观察对象生成一个值时,所有最新的值都会被收集并发送给Subject
。
public class DynamicCombineLatest<T>
{
private readonly IDictionary<IObservable<T>, T> _latestValues =
new ConcurrentDictionary<IObservable<T>, T>();
private readonly IDictionary<IObservable<T>, IDisposable> _subscriptions =
new ConcurrentDictionary<IObservable<T>, IDisposable>();
private readonly ISubject<IEnumerable<T>> _result =
new Subject<IEnumerable<T>>();
public void AddObservable(IObservable<T> observable)
{
var subscription =
observable.Subscribe(t =>
{
_latestValues[observable] = t;
_result.OnNext(_latestValues.Values);
});
_subscriptions[observable] = subscription;
}
public void RemoveObservable(IObservable<T> observable)
{
_subscriptions[observable].Dispose();
_latestValues.Remove(observable);
_subscriptions.Remove(observable);
}
public IObservable<IEnumerable<T>> Result
{
get { return _result; }
}
}