如何动态地将n个可观察对象组合成一个列表



我有一个可观察的集合,为所谓的Channel生成状态变化。我有一个ChannelSet应该监视这些通道。

我想这样写:如果一个通道是可操作的,通道设置是启动的,否则,通道设置是关闭的。

IEnumerable<ChannelState> channelStates = ...;
if (channelStates.Any(cs => cs == ChannelState.Operational))
    channelSet.ChannelSetState = ChannelSetState.Up;
else
    channelSet.ChannelSetState = ChannelSetState.Down;

但是我从哪里得到我的IEnumerable<ChannelState> ?如果我有一个通道,我可以简单地订阅它的状态更改,并相应地修改通道集的状态。对于两个通道,我可以使用CombineLatest:

Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) =>
    {
        if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up)
            return ChannelSetState.Up;
        else
            return ChannelSetState.Down;
    });

但我有一个IEnumerable<Channel>和相应的IEnumerable<IObservable<ChannelState>>。我正在寻找类似CombineLatest的东西,它不限于固定数量的可观测值。

使问题复杂化的是,通道集合可以添加到。例如,偶尔会添加一个通道。新通道还生成需要合并的状态更改。

所以我实际上要找的是一个函数:

IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

在输入更改时保持最新。应该有一些方法可以使用Rx来完成这一点,但我真的不知道如何。

有一种相当直接的方法可以用Rx做你想做的事情,但是你需要只考虑可观察对象,而不是混用枚举对象。

你真正需要考虑的函数签名是:

IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

函数如下:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();

重要的是,IObservable<IObservable<ChannelState>>中的每个IObservable<ChannelState>都能正常工作。

我假设ChannelState枚举具有Idle状态,并且每个IObservable<ChannelState>在完成之前将产生零个或多个Operational/Idle值对(Operational后跟Idle)。

你还说"可以添加和删除通道的集合"-考虑到IEnumerable<IObservable<ChannelState>>这听起来很合理-但在Rx中你不必担心删除,因为每个可观察对象都可以发出自己的完成信号。一旦它发出完成信号,就好像它已经从集合中删除了,因为它不能产生任何进一步的值。因此,您只需要担心添加到集合中—使用主题很容易。

现在这个函数可以这样使用:

var channelStatesSubject = new Subject<IObservable<ChannelState>>();
var channelStates = channelStatesSubject.AsObservable();
var channelSetStates = f(channelStates);
channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ });
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
// etc

我使用一些测试代码运行此代码,该代码使用三个随机的ChannelState可观察对象,在f函数中调用Do进行调试,并得到以下序列:

1
Up
2
3
2
1
2
1
0
Down
1
Up
0
Down

我想这就是你想要的。如果我错过了什么,请告诉我。


根据下面的评论,ChannelState枚举有多个状态,但只有Operational意味着连接是正常的。所以很容易添加一个DistinctUntilChanged运算符来隐藏多个"down"状态。下面是代码:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();

添加代码以确保第一个选择查询总是以1开头。下面是代码:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .StartWith(1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();

也许可以从IObservable<Channel>开始,而不是从w/IEnumerable<Channel>开始。这样做的一种方法是使用Subject<Channel>,当一个新的创建时,OnNext()它。

如果你需要一个列表,

xsChannels。订阅(项=>{锁(列表){list.add(项);}});

我答应添加我自己想出的解决方案,所以在这里。只要我还没有找到更好的,我会用这个,虽然我仍然认为必须有一个更好的方法:)

我使用一个使用ConcurrentDictionary的类来保存每个注册的可观察对象的最新值。当一个可观察对象被取消注册时,它的最新值会被移除,与它关联的订阅也会被移除。

当任何已注册的可观察对象生成一个值时,所有最新的值都会被收集并发送给Subject

public class DynamicCombineLatest<T>
{
    private readonly IDictionary<IObservable<T>, T> _latestValues =
        new ConcurrentDictionary<IObservable<T>, T>();
    private readonly IDictionary<IObservable<T>, IDisposable> _subscriptions =
        new ConcurrentDictionary<IObservable<T>, IDisposable>();
    private readonly ISubject<IEnumerable<T>> _result =
        new Subject<IEnumerable<T>>();
    public void AddObservable(IObservable<T> observable)
    {
        var subscription =
            observable.Subscribe(t =>
                                 {
                                     _latestValues[observable] = t;
                                     _result.OnNext(_latestValues.Values);
                                 });
        _subscriptions[observable] = subscription;
    }
    public void RemoveObservable(IObservable<T> observable)
    {
        _subscriptions[observable].Dispose();
        _latestValues.Remove(observable);
        _subscriptions.Remove(observable);
    }
    public IObservable<IEnumerable<T>> Result
    {
        get { return _result; }
    }
}

相关内容

  • 没有找到相关文章