尝试对使用RX发送来自多个发布者的通知的系统进行建模。
我有两个自定义接口ITopicObservable和ITopicObserver来模拟实现类除了IObservable和IObserver接口之外还有其他属性和方法的事实。
我的问题是,我的想法是,我应该能够将多个可观察器添加在一起,将它们合并在一起,并订阅一个观察器,以提供所有合并的可观察性的更新。但是,带有"问题"注释的代码抛出了一个无效的强制转换异常。
用例是多个独立的传感器,每个传感器监测一个盒子中的温度,例如,将它们的所有报告聚合为一个温度报告,然后由温度健康监测器订阅。
我在这里错过了什么?或者有没有更好的方法来使用RX实现场景?
下方的代码
using System;
using System.Reactive.Linq;
using System.Collections.Generic;
namespace test
{
class MainClass
{
public static void Main (string[] args)
{
Console.WriteLine ("Hello World!");
var to = new TopicObserver ();
var s = new TopicObservable ("test");
var agg = new AggregatedTopicObservable ();
agg.Add (s);
agg.Subscribe (to);
}
}
public interface ITopicObservable<TType>:IObservable<TType>
{
string Name{get;}
}
public class TopicObservable:ITopicObservable<int>
{
public TopicObservable(string name)
{
Name = name;
}
#region IObservable implementation
public IDisposable Subscribe (IObserver<int> observer)
{
return null;
}
#endregion
#region ITopicObservable implementation
public string Name { get;private set;}
#endregion
}
public class AggregatedTopicObservable:ITopicObservable<int>
{
List<TopicObservable> _topics;
private ITopicObservable<int> _observable;
private IDisposable _disposable;
public AggregatedTopicObservable()
{
_topics = new List<TopicObservable>();
}
public void Add(ITopicObservable<int> observable)
{
_topics.Add ((TopicObservable)observable);
}
#region IObservable implementation
public IDisposable Subscribe (IObserver<int> observer)
{
_observable = (ITopicObservable<int>)_topics.Merge ();
_disposable = _observable.Subscribe(observer);
return _disposable;
}
#endregion
#region ITopicObservable implementation
public string Name { get;private set;}
#endregion
}
public interface ITopicObserver<TType>:IObserver<TType>
{
string Name{get;}
}
public class TopicObserver:ITopicObserver<int>
{
#region IObserver implementation
public void OnNext (int value)
{
Console.WriteLine ("next {0}", value);
}
public void OnError (Exception error)
{
Console.WriteLine ("error {0}", error.Message);
}
public void OnCompleted ()
{
Console.WriteLine ("finished");
}
#endregion
#region ITopicObserver implementation
public string Name { get;private set;}
#endregion
}
}
我的第一个想法是,不应该实现IObservable<T>
,应该通过将其公开为属性或方法的结果来组合它。
第二个想法是,Rx中有一些运算符擅长将多个序列合并/聚合在一起。你应该喜欢用那些。
第三,与第一个类似,您通常不实现IObserver<T>
,您只订阅可观察序列并为每个回调提供委托(OnNext
、OnError
和OnComplete
)
所以你的代码基本上被简化为
Console.WriteLine("Hello World!");
var topic1 = TopicListener("test1");
var topic2 = TopicListener("test2");
topic1.Merge(topic2)
.Subscribe(
val => { Console.WriteLine("One of the topics published this value {0}", val);},
ex => { Console.WriteLine("One of the topics errored. Now the whole sequence is dead {0}", ex);},
() => {Console.WriteLine("All topics have completed.");});
其中TopicListener(string)
只是一个返回IObservable<T>
的方法。TopicListener(string)
方法的实现很可能使用Observable.Create
。
查看在基于主题的消息传递系统上映射Rx的示例可能会有所帮助。这里有一个如何在TibRv主题上叠加Rx的示例https://github.com/LeeCampbell/RxCookbook/blob/master/IO/Comms/TibRvSample.linq
您正在使用的.Merge(...)
运算符的签名是:
IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources)
此.Merge()
返回的实际类型为:
System.Reactive.Linq.ObservableImpl.Merge`1[System.Int32]
因此应该相当清楚的是,调用CCD_ 12将失败。
李建议不要执行IObservable<>
或IObserver<>
中的任何一个都是正确的。它会导致类似上面的错误。
如果你必须这样做,我会这样做:
public interface ITopic
{
string Name { get; }
}
public interface ITopicObservable<TType> : ITopic, IObservable<TType>
{ }
public interface ITopicSubject<TType> : ISubject<TType>, ITopicObservable<TType>
{ }
public interface ITopicObserver<TType> : ITopic, IObserver<TType>
{ }
public class Topic
{
public string Name { get; private set; }
public Topic(string name)
{
this.Name = name;
}
}
public class TopicSubject : Topic, ITopicSubject<int>
{
private Subject<int> _subject = new Subject<int>();
public TopicSubject(string name)
: base(name)
{ }
public IDisposable Subscribe(IObserver<int> observer)
{
return _subject.Subscribe(observer);
}
public void OnNext(int value)
{
_subject.OnNext(value);
}
public void OnError(Exception error)
{
_subject.OnError(error);
}
public void OnCompleted()
{
_subject.OnCompleted();
}
}
public class AggregatedTopicObservable : Topic, ITopicObservable<int>
{
List<ITopicObservable<int>> _topics = new List<ITopicObservable<int>>();
public AggregatedTopicObservable(string name)
: base(name)
{ }
public void Add(ITopicObservable<int> observable)
{
_topics.Add(observable);
}
public IDisposable Subscribe(IObserver<int> observer)
{
return _topics.Merge().Subscribe(observer);
}
}
public class TopicObserver : Topic, ITopicObserver<int>
{
private IObserver<int> _observer;
public TopicObserver(string name)
: base(name)
{
_observer =
Observer
.Create<int>(
value => Console.WriteLine("next {0}", value),
error => Console.WriteLine("error {0}", error.Message),
() => Console.WriteLine("finished"));
}
public void OnNext(int value)
{
_observer.OnNext(value);
}
public void OnError(Exception error)
{
_observer.OnError(error);
}
public void OnCompleted()
{
_observer.OnCompleted();
}
}
并使用运行
var to = new TopicObserver("watching");
var ts1 = new TopicSubject("topic 1");
var ts2 = new TopicSubject("topic 2");
var agg = new AggregatedTopicObservable("agg");
agg.Add(ts1);
agg.Add(ts2);
agg.Subscribe(to);
ts1.OnNext(42);
ts1.OnCompleted();
ts2.OnNext(1);
ts2.OnCompleted();
哪个给出:
下一个42下一个1已完成
但除了能够给每件事起一个名字(我不确定这有什么帮助)之外,你还可以这样做:
var to =
Observer
.Create<int>(
value => Console.WriteLine("next {0}", value),
error => Console.WriteLine("error {0}", error.Message),
() => Console.WriteLine("finished"));
var ts1 = new Subject<int>();
var ts2 = new Subject<int>();
var agg = new [] { ts1, ts2 }.Merge();
agg.Subscribe(to);
ts1.OnNext(42);
ts1.OnCompleted();
ts2.OnNext(1);
ts2.OnCompleted();
没有接口和类的相同输出。
还有一种更有趣的方式。试试这个:
var to =
Observer
.Create<int>(
value => Console.WriteLine("next {0}", value),
error => Console.WriteLine("error {0}", error.Message),
() => Console.WriteLine("finished"));
var agg = new Subject<IObservable<int>>();
agg.Merge().Subscribe(to);
var ts1 = new Subject<int>();
var ts2 = new Subject<int>();
agg.OnNext(ts1);
agg.OnNext(ts2);
ts1.OnNext(42);
ts1.OnCompleted();
ts2.OnNext(1);
ts2.OnCompleted();
var ts3 = new Subject<int>();
agg.OnNext(ts3);
ts3.OnNext(99);
ts3.OnCompleted();
这产生:
下一个42下一个1下一个99
它允许您在合并后添加新的源可观测性!