在RX中合并多个自定义可观测值



尝试对使用RX发送来自多个发布者的通知的系统进行建模。

我有两个自定义接口ITopicObservable和ITopicObserver来模拟实现类除了IObservable和IObserver接口之外还有其他属性和方法的事实。

我的问题是,我的想法是,我应该能够将多个可观察器添加在一起,将它们合并在一起,并订阅一个观察器,以提供所有合并的可观察性的更新。但是,带有"问题"注释的代码抛出了一个无效的强制转换异常。

用例是多个独立的传感器,每个传感器监测一个盒子中的温度,例如,将它们的所有报告聚合为一个温度报告,然后由温度健康监测器订阅。

我在这里错过了什么?或者有没有更好的方法来使用RX实现场景?

下方的代码

using System;
using System.Reactive.Linq;
using System.Collections.Generic;
namespace test
{
class MainClass
{
    public static void Main (string[] args)
    {
        Console.WriteLine ("Hello World!");
        var to = new TopicObserver ();
        var s = new TopicObservable ("test");
        var agg = new AggregatedTopicObservable ();
        agg.Add (s);
        agg.Subscribe (to);
    }
}
public interface ITopicObservable<TType>:IObservable<TType>
{
    string Name{get;}
}
public class TopicObservable:ITopicObservable<int>
{
    public TopicObservable(string name)
    {
        Name = name;
    }
    #region IObservable implementation
    public IDisposable Subscribe (IObserver<int> observer)
    {
        return null;
    }
    #endregion
    #region ITopicObservable implementation
    public string Name { get;private set;}
    #endregion
}
public class AggregatedTopicObservable:ITopicObservable<int>
{
    List<TopicObservable> _topics;
    private ITopicObservable<int> _observable;
    private IDisposable _disposable;
    public AggregatedTopicObservable()
    {
        _topics = new List<TopicObservable>();
    }
    public void Add(ITopicObservable<int> observable)
    {
        _topics.Add ((TopicObservable)observable);
    }
    #region IObservable implementation
    public IDisposable Subscribe (IObserver<int> observer)
    {
        _observable = (ITopicObservable<int>)_topics.Merge ();
        _disposable = _observable.Subscribe(observer);
        return _disposable;
    }
    #endregion
    #region ITopicObservable implementation
    public string Name { get;private set;}
    #endregion
}

public interface ITopicObserver<TType>:IObserver<TType>
{
    string Name{get;}
}
public class TopicObserver:ITopicObserver<int>
{
    #region IObserver implementation
    public void OnNext (int value)
    {
        Console.WriteLine ("next {0}", value);
    }
    public void OnError (Exception error)
    {
        Console.WriteLine ("error {0}", error.Message);
    }
    public void OnCompleted ()
    {
        Console.WriteLine ("finished");
    }
    #endregion
    #region ITopicObserver implementation
    public string Name { get;private set;}
    #endregion
}
}

我的第一个想法是,不应该实现IObservable<T>,应该通过将其公开为属性或方法的结果来组合它。

第二个想法是,Rx中有一些运算符擅长将多个序列合并/聚合在一起。你应该喜欢用那些。

第三,与第一个类似,您通常不实现IObserver<T>,您只订阅可观察序列并为每个回调提供委托(OnNextOnErrorOnComplete

所以你的代码基本上被简化为

Console.WriteLine("Hello World!");
var topic1 = TopicListener("test1");
var topic2 = TopicListener("test2");
topic1.Merge(topic2)
    .Subscribe(
    val => { Console.WriteLine("One of the topics published this value {0}", val);},
    ex => { Console.WriteLine("One of the topics errored. Now the whole sequence is dead {0}", ex);},
    () => {Console.WriteLine("All topics have completed.");});

其中TopicListener(string)只是一个返回IObservable<T>的方法。TopicListener(string)方法的实现很可能使用Observable.Create

查看在基于主题的消息传递系统上映射Rx的示例可能会有所帮助。这里有一个如何在TibRv主题上叠加Rx的示例https://github.com/LeeCampbell/RxCookbook/blob/master/IO/Comms/TibRvSample.linq

您正在使用的.Merge(...)运算符的签名是:

IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources)

.Merge()返回的实际类型为:

System.Reactive.Linq.ObservableImpl.Merge`1[System.Int32]

因此应该相当清楚的是,调用CCD_ 12将失败。

李建议不要执行IObservable<>IObserver<>中的任何一个都是正确的。它会导致类似上面的错误。

如果你必须这样做,我会这样做:

public interface ITopic
{
    string Name { get; }
}
public interface ITopicObservable<TType> : ITopic, IObservable<TType>
{ }
public interface ITopicSubject<TType> : ISubject<TType>, ITopicObservable<TType>
{ }
public interface ITopicObserver<TType> : ITopic, IObserver<TType>
{ }
public class Topic
{
    public string Name { get; private set; }
    public Topic(string name)
    {
        this.Name = name;
    }
}
public class TopicSubject : Topic, ITopicSubject<int>
{
    private Subject<int> _subject = new Subject<int>();
    public TopicSubject(string name)
        : base(name)
    { }
    public IDisposable Subscribe(IObserver<int> observer)
    {
        return _subject.Subscribe(observer);
    }
    public void OnNext(int value)
    {
        _subject.OnNext(value);
    }
    public void OnError(Exception error)
    {
        _subject.OnError(error);
    }
    public void OnCompleted()
    {
        _subject.OnCompleted();
    }
}
public class AggregatedTopicObservable : Topic, ITopicObservable<int>
{
    List<ITopicObservable<int>> _topics = new List<ITopicObservable<int>>();
    public AggregatedTopicObservable(string name)
        : base(name)
    { }
    public void Add(ITopicObservable<int> observable)
    {
        _topics.Add(observable);
    }
    public IDisposable Subscribe(IObserver<int> observer)
    {
        return _topics.Merge().Subscribe(observer);
    }
}
public class TopicObserver : Topic, ITopicObserver<int>
{
    private IObserver<int> _observer;
    public TopicObserver(string name)
        : base(name)
    {
        _observer =
            Observer
                .Create<int>(
                    value => Console.WriteLine("next {0}", value),
                    error => Console.WriteLine("error {0}", error.Message),
                    () => Console.WriteLine("finished"));
    }
    public void OnNext(int value)
    {
        _observer.OnNext(value);
    }
    public void OnError(Exception error)
    {
        _observer.OnError(error);
    }
    public void OnCompleted()
    {
        _observer.OnCompleted();
    }
}

并使用运行

var to = new TopicObserver("watching");
var ts1 = new TopicSubject("topic 1");
var ts2 = new TopicSubject("topic 2");
var agg = new AggregatedTopicObservable("agg");
agg.Add(ts1);
agg.Add(ts2);
agg.Subscribe(to);
ts1.OnNext(42);
ts1.OnCompleted();
ts2.OnNext(1);
ts2.OnCompleted();

哪个给出:

下一个42下一个1已完成

但除了能够给每件事起一个名字(我不确定这有什么帮助)之外,你还可以这样做:

var to =
    Observer
        .Create<int>(
            value => Console.WriteLine("next {0}", value),
            error => Console.WriteLine("error {0}", error.Message),
            () => Console.WriteLine("finished"));
var ts1 = new Subject<int>();
var ts2 = new Subject<int>();
var agg = new [] { ts1, ts2 }.Merge();
agg.Subscribe(to);
ts1.OnNext(42);
ts1.OnCompleted();
ts2.OnNext(1);
ts2.OnCompleted();

没有接口和类的相同输出。

还有一种更有趣的方式。试试这个:

var to =
    Observer
        .Create<int>(
            value => Console.WriteLine("next {0}", value),
            error => Console.WriteLine("error {0}", error.Message),
            () => Console.WriteLine("finished"));
var agg = new Subject<IObservable<int>>();
agg.Merge().Subscribe(to);
var ts1 = new Subject<int>();
var ts2 = new Subject<int>();
agg.OnNext(ts1);
agg.OnNext(ts2);
ts1.OnNext(42);
ts1.OnCompleted();
ts2.OnNext(1);
ts2.OnCompleted();
var ts3 = new Subject<int>();
agg.OnNext(ts3);
ts3.OnNext(99);
ts3.OnCompleted();

这产生:

下一个42下一个1下一个99

它允许您在合并后添加新的源可观测性!

相关内容

  • 没有找到相关文章

最新更新